Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 2235 invoked from network); 5 Mar 2010 03:12:36 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 5 Mar 2010 03:12:36 -0000 Received: (qmail 857 invoked by uid 500); 5 Mar 2010 03:12:24 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 789 invoked by uid 500); 5 Mar 2010 03:12:24 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 776 invoked by uid 99); 5 Mar 2010 03:12:23 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Mar 2010 03:12:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Mar 2010 03:12:17 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8A18823888E7; Fri, 5 Mar 2010 03:11:57 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r919277 - in /hadoop/mapreduce/trunk: ./ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/ Date: Fri, 05 Mar 2010 03:11:57 -0000 To: mapreduce-commits@hadoop.apache.org From: cdouglas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100305031157.8A18823888E7@eris.apache.org> Author: cdouglas Date: Fri Mar 5 03:11:56 2010 New Revision: 919277 URL: http://svn.apache.org/viewvc?rev=919277&view=rev Log: MAPREDUCE-1408. Add customizable job submission policies to Gridmix. Contributed by Rahul Singh Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StatListener.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=919277&r1=919276&r2=919277&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Fri Mar 5 03:11:56 2010 @@ -200,6 +200,9 @@ MAPREDUCE-1454. Quote user supplied strings in Tracker servlets. (cdouglas) + MAPREDUCE-1408. Add customizable job submission policies to Gridmix. (Rahul + Singh via cdouglas) + OPTIMIZATIONS MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=919277&r1=919276&r2=919277&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Fri Mar 5 03:11:56 2010 @@ -32,7 +32,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; - +import org.apache.hadoop.tools.rumen.ZombieJobProducer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -82,6 +82,7 @@ private JobFactory factory; private JobSubmitter submitter; private JobMonitor monitor; + private Statistics statistics; // Shutdown hook private final Shutdown sdh = new Shutdown(); @@ -129,20 +130,38 @@ */ private void startThreads(Configuration conf, String traceIn, Path ioPath, Path scratchDir, CountDownLatch startFlag) throws IOException { - monitor = createJobMonitor(); - submitter = createJobSubmitter(monitor, - conf.getInt(GRIDMIX_SUB_THR, - Runtime.getRuntime().availableProcessors() + 1), - conf.getInt(GRIDMIX_QUE_DEP, 5), - new FilePool(conf, ioPath)); - factory = createJobFactory(submitter, traceIn, scratchDir, conf, startFlag); - monitor.start(); - submitter.start(); - factory.start(); - } + try { + GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy( + conf, GridmixJobSubmissionPolicy.STRESS); + LOG.info(" Submission policy is " + policy.name()); + statistics = new Statistics(conf, policy.getPollingInterval(), startFlag); + monitor = createJobMonitor(statistics); + int noOfSubmitterThreads = policy.name().equals( + GridmixJobSubmissionPolicy.SERIAL.name()) ? 1 : + Runtime.getRuntime().availableProcessors() + 1; + + submitter = createJobSubmitter( + monitor, conf.getInt( + GRIDMIX_SUB_THR, noOfSubmitterThreads), + conf.getInt(GRIDMIX_QUE_DEP, 5), new FilePool(conf, ioPath)); + factory = createJobFactory( + submitter, traceIn, scratchDir, conf, startFlag); + if (policy.name().equals(GridmixJobSubmissionPolicy.SERIAL.name())) { + statistics.addJobStatsListeners(factory); + } else { + statistics.addClusterStatsObservers(factory); + } + + monitor.start(); + submitter.start(); + }catch(Exception e) { + LOG.error(" Exception at start " ,e); + throw new IOException(e); + } + } - protected JobMonitor createJobMonitor() throws IOException { - return new JobMonitor(); + protected JobMonitor createJobMonitor(Statistics stats) throws IOException { + return new JobMonitor(stats); } protected JobSubmitter createJobSubmitter(JobMonitor monitor, int threads, @@ -153,9 +172,11 @@ protected JobFactory createJobFactory(JobSubmitter submitter, String traceIn, Path scratchDir, Configuration conf, CountDownLatch startFlag) throws IOException { - return new JobFactory(submitter, createInputStream(traceIn), scratchDir, - conf, startFlag); - } + return GridmixJobSubmissionPolicy.getPolicy( + conf, GridmixJobSubmissionPolicy.STRESS).createJobFactory( + submitter, new ZombieJobProducer( + createInputStream( + traceIn), null), scratchDir, conf, startFlag); } public int run(String[] argv) throws IOException, InterruptedException { if (argv.length < 2) { @@ -196,6 +217,8 @@ } // scan input dir contents submitter.refreshFilePool(); + factory.start(); + statistics.start(); } catch (Throwable e) { LOG.error("Startup failed", e); if (factory != null) factory.abort(); // abort pipeline @@ -203,7 +226,6 @@ // signal for factory to start; sets start time startFlag.countDown(); } - if (factory != null) { // wait for input exhaustion factory.join(Long.MAX_VALUE); @@ -218,6 +240,10 @@ // wait for running tasks to complete monitor.shutdown(); monitor.join(Long.MAX_VALUE); + + statistics.shutdown(); + statistics.join(Long.MAX_VALUE); + } } finally { IOUtils.cleanup(LOG, trace); @@ -256,6 +282,7 @@ killComponent(factory, FAC_SLEEP); // read no more tasks killComponent(submitter, SUB_SLEEP); // submit no more tasks killComponent(monitor, MON_SLEEP); // process remaining jobs here + killComponent(statistics,MON_SLEEP); } finally { if (monitor == null) { return; Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java?rev=919277&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java (added) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java Fri Mar 5 03:11:56 2010 @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.tools.rumen.JobStoryProducer; +import org.apache.hadoop.mapred.gridmix.Statistics.JobStats; +import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats; + +import java.util.concurrent.CountDownLatch; +import java.io.IOException; + +enum GridmixJobSubmissionPolicy { + + REPLAY("REPLAY", 320000) { + @Override + public JobFactory createJobFactory( + JobSubmitter submitter, JobStoryProducer producer, Path scratchDir, + Configuration conf, CountDownLatch startFlag) throws IOException { + return new ReplayJobFactory( + submitter, producer, scratchDir, conf, startFlag); + } + }, + + STRESS("STRESS", 5000) { + @Override + public JobFactory createJobFactory( + JobSubmitter submitter, JobStoryProducer producer, Path scratchDir, + Configuration conf, CountDownLatch startFlag) throws IOException { + return new StressJobFactory( + submitter, producer, scratchDir, conf, startFlag); + } + }, + + SERIAL("SERIAL", 0) { + @Override + public JobFactory createJobFactory( + JobSubmitter submitter, JobStoryProducer producer, Path scratchDir, + Configuration conf, CountDownLatch startFlag) throws IOException { + return new SerialJobFactory( + submitter, producer, scratchDir, conf, startFlag); + } + + @Override + public int getPollingInterval() { + return 0; + } + }; + + public static final String JOB_SUBMISSION_POLICY = + "gridmix.job-submission.policy"; + + private final String name; + private final int pollingInterval; + + GridmixJobSubmissionPolicy(String name, int pollingInterval) { + this.name = name; + this.pollingInterval = pollingInterval; + } + + public abstract JobFactory createJobFactory( + JobSubmitter submitter, JobStoryProducer producer, Path scratchDir, + Configuration conf, CountDownLatch startFlag) throws IOException; + + public int getPollingInterval() { + return pollingInterval; + } + + public static GridmixJobSubmissionPolicy getPolicy( + Configuration conf, GridmixJobSubmissionPolicy defaultPolicy) { + String policy = conf.get(JOB_SUBMISSION_POLICY, defaultPolicy.name()); + return valueOf(policy.toUpperCase()); + } +} Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java?rev=919277&r1=919276&r2=919277&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (original) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java Fri Mar 5 03:11:56 2010 @@ -17,15 +17,10 @@ */ package org.apache.hadoop.mapred.gridmix; -import java.io.IOException; -import java.io.InputStream; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobID; @@ -33,13 +28,16 @@ import org.apache.hadoop.tools.rumen.JobStory; import org.apache.hadoop.tools.rumen.JobStoryProducer; import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values; -import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants; import org.apache.hadoop.tools.rumen.TaskAttemptInfo; import org.apache.hadoop.tools.rumen.TaskInfo; import org.apache.hadoop.tools.rumen.ZombieJobProducer; +import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicInteger; /** @@ -49,19 +47,20 @@ * construction. * @see org.apache.hadoop.tools.rumen.HadoopLogsAnalyzer */ -class JobFactory implements Gridmix.Component { +abstract class JobFactory implements Gridmix.Component,StatListener { public static final Log LOG = LogFactory.getLog(JobFactory.class); - private final Path scratch; - private final float rateFactor; - private final Configuration conf; - private final ReaderThread rThread; - private final AtomicInteger sequence; - private final JobSubmitter submitter; - private final CountDownLatch startFlag; - private volatile IOException error = null; + protected final Path scratch; + protected final float rateFactor; + protected final Configuration conf; + protected final Thread rThread; + protected final AtomicInteger sequence; + protected final JobSubmitter submitter; + protected final CountDownLatch startFlag; + protected volatile IOException error = null; protected final JobStoryProducer jobProducer; + protected final ReentrantLock lock = new ReentrantLock(true); /** * Creating a new instance does not start the thread. @@ -71,6 +70,7 @@ * @param scratch Directory into which to write output from simulated jobs * @param conf Config passed to all jobs to be submitted * @param startFlag Latch released from main to start pipeline + * @throws java.io.IOException */ public JobFactory(JobSubmitter submitter, InputStream jobTrace, Path scratch, Configuration conf, CountDownLatch startFlag) @@ -96,7 +96,7 @@ this.conf = new Configuration(conf); this.submitter = submitter; this.startFlag = startFlag; - this.rThread = new ReaderThread(); + this.rThread = createReaderThread(); } static class MinTaskInfo extends TaskInfo { @@ -122,7 +122,7 @@ } } - static class FilterJobStory implements JobStory { + protected static class FilterJobStory implements JobStory { protected final JobStory job; @@ -154,75 +154,24 @@ } } - /** - * Worker thread responsible for reading descriptions, assigning sequence - * numbers, and normalizing time. - */ - private class ReaderThread extends Thread { - - public ReaderThread() { - super("GridmixJobFactory"); - } + protected abstract Thread createReaderThread() ; - private JobStory getNextJobFiltered() throws IOException { - JobStory job; - do { - job = jobProducer.getNextJob(); - } while (job != null - && (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS || - job.getSubmissionTime() < 0)); - return null == job ? null : new FilterJobStory(job) { - @Override - public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) { - return new MinTaskInfo(this.job.getTaskInfo(taskType, taskNumber)); - } - }; - } - - @Override - public void run() { - try { - startFlag.await(); - if (Thread.currentThread().isInterrupted()) { - return; - } - final long initTime = TimeUnit.MILLISECONDS.convert( - System.nanoTime(), TimeUnit.NANOSECONDS); - LOG.debug("START @ " + initTime); - long first = -1; - long last = -1; - while (!Thread.currentThread().isInterrupted()) { - try { - final JobStory job = getNextJobFiltered(); - if (null == job) { - return; - } - if (first < 0) { - first = job.getSubmissionTime(); - } - final long current = job.getSubmissionTime(); - if (current < last) { - LOG.warn("Job " + job.getJobID() + " out of order"); - continue; - } - last = current; - submitter.add(new GridmixJob(conf, initTime + - Math.round(rateFactor * (current - first)), - job, scratch, sequence.getAndIncrement())); - } catch (IOException e) { - JobFactory.this.error = e; - return; - } - } - } catch (InterruptedException e) { - // exit thread; ignore any jobs remaining in the trace - return; - } finally { - IOUtils.cleanup(null, jobProducer); + protected JobStory getNextJobFiltered() throws IOException { + JobStory job; + do { + job = jobProducer.getNextJob(); + } while (job != null && + (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS || + job.getSubmissionTime() < 0)); + return null == job ? null : new FilterJobStory(job) { + @Override + public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) { + return new MinTaskInfo(this.job.getTaskInfo(taskType, taskNumber)); } - } + }; } + /** * Obtain the error that caused the thread to exit unexpectedly. */ Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java?rev=919277&r1=919276&r2=919277&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java (original) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java Fri Mar 5 03:11:56 2010 @@ -47,14 +47,12 @@ private final MonitorThread mThread; private final BlockingQueue runningJobs; private final long pollDelayMillis; + private Statistics statistics; private boolean graceful = false; private boolean shutdown = false; - /** - * Create a JobMonitor with a default polling interval of 5s. - */ - public JobMonitor() { - this(5, TimeUnit.SECONDS); + public JobMonitor(Statistics statistics) { + this(5,TimeUnit.SECONDS, statistics); } /** @@ -62,12 +60,14 @@ * polling a still-running job. * @param pollDelay Delay after polling a running job * @param unit Time unit for pollDelaySec (rounded to milliseconds) + * @param statistics StatCollector , listener to job completion. */ - public JobMonitor(int pollDelay, TimeUnit unit) { + public JobMonitor(int pollDelay, TimeUnit unit, Statistics statistics) { mThread = new MonitorThread(); runningJobs = new LinkedBlockingQueue(); mJobs = new LinkedList(); this.pollDelayMillis = TimeUnit.MILLISECONDS.convert(pollDelay, unit); + this.statistics = statistics; } /** @@ -78,6 +78,17 @@ } /** + * Add a submission failed job , such tht it can be communicated + * back to serial. + * TODO: Cleaner solution for this problem + * @param job + */ + public void submissionFailed(Job job) { + LOG.info(" Job submission failed notify if anyone is waiting " + job); + this.statistics.add(job); + } + + /** * Temporary hook for recording job success. */ protected void onSuccess(Job job) { @@ -162,6 +173,7 @@ try { if (job.isComplete()) { process(job); + statistics.add(job); continue; } } catch (IOException e) { Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java?rev=919277&r1=919276&r2=919277&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java (original) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java Fri Mar 5 03:11:56 2010 @@ -82,6 +82,11 @@ job.buildSplits(inputDir); } catch (IOException e) { LOG.warn("Failed to submit " + job.getJob().getJobName(), e); + monitor.submissionFailed(job.getJob()); + return; + } catch (Exception e) { + LOG.warn("Failed to submit " + job.getJob().getJobName(), e); + monitor.submissionFailed(job.getJob()); return; } // Sleep until deadline @@ -101,14 +106,22 @@ throw new InterruptedException("Failed to submit " + job.getJob().getJobName()); } + monitor.submissionFailed(job.getJob()); } catch (ClassNotFoundException e) { LOG.warn("Failed to submit " + job.getJob().getJobName(), e); + monitor.submissionFailed(job.getJob()); } } catch (InterruptedException e) { // abort execution, remove splits if nesc // TODO release ThdLoc GridmixJob.pullDescription(job.id()); Thread.currentThread().interrupt(); + monitor.submissionFailed(job.getJob()); + return; + } catch(Exception e) { + //Due to some exception job wasnt submitted. + LOG.info(" Job " + job.getJob() + " submission failed " , e); + monitor.submissionFailed(job.getJob()); return; } finally { sem.release(); Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java?rev=919277&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java (added) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java Fri Mar 5 03:11:56 2010 @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.tools.rumen.JobStoryProducer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + + class ReplayJobFactory extends JobFactory { + public static final Log LOG = LogFactory.getLog(ReplayJobFactory.class); + + /** + * Creating a new instance does not start the thread. + * + * @param submitter Component to which deserialized jobs are passed + * @param jobProducer Job story producer + * {@link org.apache.hadoop.tools.rumen.ZombieJobProducer} + * @param scratch Directory into which to write output from simulated jobs + * @param conf Config passed to all jobs to be submitted + * @param startFlag Latch released from main to start pipeline + * @throws java.io.IOException + */ + public ReplayJobFactory( + JobSubmitter submitter, JobStoryProducer jobProducer, Path scratch, + Configuration conf, CountDownLatch startFlag) + throws IOException { + super(submitter, jobProducer, scratch, conf, startFlag); + } + + + @Override + public Thread createReaderThread() { + return new ReplayReaderThread("ReplayJobFactory"); + } + + /** + * @param item + */ + public void update(Statistics.ClusterStats item) { + } + + private class ReplayReaderThread extends Thread { + + public ReplayReaderThread(String threadName) { + super(threadName); + } + + + public void run() { + try { + startFlag.await(); + if (Thread.currentThread().isInterrupted()) { + return; + } + final long initTime = TimeUnit.MILLISECONDS.convert( + System.nanoTime(), TimeUnit.NANOSECONDS); + LOG.info("START REPLAY @ " + initTime); + long first = -1; + long last = -1; + while (!Thread.currentThread().isInterrupted()) { + try { + final JobStory job = getNextJobFiltered(); + if (null == job) { + return; + } + if (first < 0) { + first = job.getSubmissionTime(); + } + final long current = job.getSubmissionTime(); + if (current < last) { + LOG.warn("Job " + job.getJobID() + " out of order"); + continue; + } + last = current; + submitter.add( + new GridmixJob( + conf, initTime + Math.round(rateFactor * (current - first)), + job, scratch,sequence.getAndIncrement())); + } catch (IOException e) { + error = e; + return; + } + } + } catch (InterruptedException e) { + // exit thread; ignore any jobs remaining in the trace + } finally { + IOUtils.cleanup(null, jobProducer); + } + } + } + + /** + * Start the reader thread, wait for latch if necessary. + */ + @Override + public void start() { + this.rThread.start(); + } + +} \ No newline at end of file Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java?rev=919277&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java (added) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java Fri Mar 5 03:11:56 2010 @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.tools.rumen.JobStoryProducer; +import org.apache.hadoop.mapred.gridmix.Statistics.JobStats; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.Condition; + +public class SerialJobFactory extends JobFactory { + + public static final Log LOG = LogFactory.getLog(SerialJobFactory.class); + private final Condition jobCompleted = lock.newCondition(); + + /** + * Creating a new instance does not start the thread. + * + * @param submitter Component to which deserialized jobs are passed + * @param jobProducer Job story producer + * {@link org.apache.hadoop.tools.rumen.ZombieJobProducer} + * @param scratch Directory into which to write output from simulated jobs + * @param conf Config passed to all jobs to be submitted + * @param startFlag Latch released from main to start pipeline + * @throws java.io.IOException + */ + public SerialJobFactory( + JobSubmitter submitter, JobStoryProducer jobProducer, Path scratch, + Configuration conf, CountDownLatch startFlag) + throws IOException { + super(submitter, jobProducer, scratch, conf, startFlag); + } + + @Override + public Thread createReaderThread() { + return new SerialReaderThread("SerialJobFactory"); + } + + private class SerialReaderThread extends Thread { + + public SerialReaderThread(String threadName) { + super(threadName); + } + + /** + * SERIAL : In this scenario . method waits on notification , + * that a submitted job is actually completed. Logic is simple. + * === + * while(true) { + * wait till previousjob is completed. + * break; + * } + * submit newJob. + * previousJob = newJob; + * == + */ + @Override + public void run() { + try { + startFlag.await(); + if (Thread.currentThread().isInterrupted()) { + return; + } + LOG.info("START SERIAL @ " + System.currentTimeMillis()); + GridmixJob prevJob; + while (!Thread.currentThread().isInterrupted()) { + final JobStory job; + try { + job = getNextJobFiltered(); + if (null == job) { + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug( + "Serial mode submitting job " + job.getName()); + } + prevJob = new GridmixJob( + conf, 0L, job, scratch,sequence.getAndIncrement()); + + lock.lock(); + try { + LOG.info(" Submitted the job " + prevJob); + submitter.add(prevJob); + } finally { + lock.unlock(); + } + } catch (IOException e) { + error = e; + //If submission of current job fails , try to submit the next job. + return; + } + + if (prevJob != null) { + //Wait till previous job submitted is completed. + lock.lock(); + try { + while (true) { + try { + jobCompleted.await(); + } catch (InterruptedException ie) { + LOG.error( + " Error in SerialJobFactory while waiting for job completion ", + ie); + return; + } + if (LOG.isDebugEnabled()) { + LOG.info(" job " + job.getName() + " completed "); + } + break; + } + } finally { + lock.unlock(); + } + prevJob = null; + } + } + } catch (InterruptedException e) { + return; + } finally { + IOUtils.cleanup(null, jobProducer); + } + } + + } + + /** + * SERIAL. Once you get notification from StatsCollector about the job + * completion ,simply notify the waiting thread. + * + * @param item + */ + @Override + public void update(Statistics.JobStats item) { + //simply notify in case of serial submissions. We are just bothered + //if submitted job is completed or not. + lock.lock(); + try { + jobCompleted.signalAll(); + } finally { + lock.unlock(); + } + } + + /** + * Start the reader thread, wait for latch if necessary. + */ + @Override + public void start() { + LOG.info(" Starting Serial submission "); + this.rThread.start(); + } +} Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StatListener.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StatListener.java?rev=919277&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StatListener.java (added) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StatListener.java Fri Mar 5 03:11:56 2010 @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.gridmix; + +/** + * Stat listener. + * @param + */ +interface StatListener{ + + /** + * + * @param item + */ + void update(T item); +} Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java?rev=919277&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java (added) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java Fri Mar 5 03:11:56 2010 @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.gridmix.Gridmix.Component; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.ClusterMetrics; +import org.apache.hadoop.mapreduce.Job; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Component collecting the stats required by other components + * to make decisions. + * Single thread Collector tries to collec the stats. + * Each of thread poll updates certain datastructure(Currently ClusterStats). + * Components interested in these datastructure, need to register. + * StatsCollector notifies each of the listeners. + */ +public class Statistics implements Component { + public static final Log LOG = LogFactory.getLog(Statistics.class); + + private final StatCollector statistics = new StatCollector(); + private final Cluster cluster; + + //List of cluster status listeners. + private final List> clusterStatlisteners = + new ArrayList>(); + + //List of job status listeners. + private final List> jobStatListeners = + new ArrayList>(); + + private int completedJobsInCurrentInterval = 0; + private final int jtPollingInterval; + private volatile boolean shutdown = false; + private final int maxJobCompletedInInterval; + private static final String MAX_JOBS_COMPLETED_IN_POLL_INTERVAL_KEY = + "gridmix.max-jobs-completed-in-poll-interval"; + private final ReentrantLock lock = new ReentrantLock(); + private final Condition jobCompleted = lock.newCondition(); + private final CountDownLatch startFlag; + + public Statistics( + Configuration conf, int pollingInterval, CountDownLatch startFlag) + throws IOException { + this.cluster = new Cluster(conf); + this.jtPollingInterval = pollingInterval; + maxJobCompletedInInterval = conf.getInt( + MAX_JOBS_COMPLETED_IN_POLL_INTERVAL_KEY, 1); + this.startFlag = startFlag; + } + + /** + * Used by JobMonitor to add the completed job. + */ + @Override + public void add(Job job) { + //This thread will be notified initially by jobmonitor incase of + //data generation. Ignore that as we are getting once the input is + //generated. + if (!statistics.isAlive()) { + return; + } + completedJobsInCurrentInterval++; + //check if we have reached the maximum level of job completions. + if (completedJobsInCurrentInterval >= maxJobCompletedInInterval) { + if (LOG.isDebugEnabled()) { + LOG.debug( + " Reached maximum limit of jobs in a polling interval " + + completedJobsInCurrentInterval); + } + completedJobsInCurrentInterval = 0; + lock.lock(); + try { + //Job is completed notify all the listeners. + if (jobStatListeners.size() > 0) { + for (StatListener l : jobStatListeners) { + JobStats stats = new JobStats(); + stats.setCompleteJob(job); + l.update(stats); + } + } + this.jobCompleted.signalAll(); + } finally { + lock.unlock(); + } + } + } + + //TODO: We have just 2 types of listeners as of now . If no of listeners + //increase then we should move to map kind of model. + + public void addClusterStatsObservers(StatListener listener) { + clusterStatlisteners.add(listener); + } + + public void addJobStatsListeners(StatListener listener) { + this.jobStatListeners.add(listener); + } + + /** + * Attempt to start the service. + */ + @Override + public void start() { + statistics.start(); + } + + private class StatCollector extends Thread { + + StatCollector() { + super("StatsCollectorThread"); + } + + public void run() { + try { + startFlag.await(); + if (Thread.currentThread().isInterrupted()) { + return; + } + } catch (InterruptedException ie) { + LOG.error( + "Statistics Error while waiting for other threads to get ready ", ie); + return; + } + while (!shutdown) { + lock.lock(); + try { + jobCompleted.await(jtPollingInterval, TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) { + LOG.error( + "Statistics interrupt while waiting for polling " + ie.getCause(), + ie); + return; + } finally { + lock.unlock(); + } + + //Fetch cluster data only if required.i.e . + // only if there are clusterStats listener. + if (clusterStatlisteners.size() > 0) { + try { + ClusterMetrics clusterStatus = cluster.getClusterStatus(); + Job[] allJobs = cluster.getAllJobs(); + List runningWaitingJobs = getRunningWaitingJobs(allJobs); + updateAndNotifyClusterStatsListeners( + clusterStatus, runningWaitingJobs); + } catch (IOException e) { + LOG.error( + "Statistics io exception while polling JT ", e); + return; + } catch (InterruptedException e) { + LOG.error( + "Statistics interrupt exception while polling JT ", e); + return; + } + } + } + } + + private void updateAndNotifyClusterStatsListeners( + ClusterMetrics clusterMetrics, List runningWaitingJobs) { + ClusterStats stats = ClusterStats.getClusterStats(); + stats.setClusterMetric(clusterMetrics); + stats.setRunningWaitingJobs(runningWaitingJobs); + for (StatListener listener : clusterStatlisteners) { + listener.update(stats); + } + } + + + /** + * From the list of Jobs , give the list of jobs whoes state is eigther + * PREP or RUNNING. + * + * @param allJobs + * @return + * @throws java.io.IOException + * @throws InterruptedException + */ + private List getRunningWaitingJobs(Job[] allJobs) + throws IOException, InterruptedException { + List result = new ArrayList(); + for (Job job : allJobs) { + //TODO Check if job.getStatus() makes a rpc call + org.apache.hadoop.mapreduce.JobStatus.State state = + job.getStatus().getState(); + if (org.apache.hadoop.mapreduce.JobStatus.State.PREP.equals(state) || + org.apache.hadoop.mapreduce.JobStatus.State.RUNNING.equals(state)) { + result.add(job); + } + } + return result; + } + } + + /** + * Wait until the service completes. It is assumed that either a + * {@link #shutdown} or {@link #abort} has been requested. + */ + @Override + public void join(long millis) throws InterruptedException { + statistics.join(millis); + } + + @Override + public void shutdown() { + shutdown = true; + clusterStatlisteners.clear(); + jobStatListeners.clear(); + statistics.interrupt(); + } + + @Override + public void abort() { + shutdown = true; + clusterStatlisteners.clear(); + jobStatListeners.clear(); + statistics.interrupt(); + } + + /** + * Class to encapsulate the JobStats information. + * Current we just need information about completedJob. + * TODO: In future we need to extend this to send more information. + */ + static class JobStats { + private Job completedJob; + + public Job getCompleteJob() { + return completedJob; + } + + public void setCompleteJob(Job job) { + this.completedJob = job; + } + } + + static class ClusterStats { + private ClusterMetrics status = null; + private static ClusterStats stats = new ClusterStats(); + private List runningWaitingJobs; + + private ClusterStats() { + + } + + /** + * @return stats + */ + static ClusterStats getClusterStats() { + return stats; + } + + /** + * @param metrics + */ + void setClusterMetric(ClusterMetrics metrics) { + this.status = metrics; + } + + /** + * @return metrics + */ + public ClusterMetrics getStatus() { + return status; + } + + /** + * @return runningWatitingJobs + */ + public List getRunningWaitingJobs() { + return runningWaitingJobs; + } + + public void setRunningWaitingJobs(List runningWaitingJobs) { + this.runningWaitingJobs = runningWaitingJobs; + } + + } +} Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java?rev=919277&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java (added) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java Fri Mar 5 03:11:56 2010 @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.ClusterMetrics; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.tools.rumen.JobStoryProducer; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.Condition; + +public class StressJobFactory extends JobFactory { + public static final Log LOG = LogFactory.getLog(StressJobFactory.class); + + private LoadStatus loadStatus = new LoadStatus(); + private List runningWaitingJobs; + private final Condition overloaded = this.lock.newCondition(); + /** + * The minimum ratio between pending+running map tasks (aka. incomplete map + * tasks) and cluster map slot capacity for us to consider the cluster is + * overloaded. For running maps, we only count them partially. Namely, a 40% + * completed map is counted as 0.6 map tasks in our calculation. + */ + static final float OVERLAOD_MAPTASK_MAPSLOT_RATIO = 2.0f; + + /** + * Creating a new instance does not start the thread. + * + * @param submitter Component to which deserialized jobs are passed + * @param jobProducer Stream of job traces with which to construct a + * {@link org.apache.hadoop.tools.rumen.ZombieJobProducer} + * @param scratch Directory into which to write output from simulated jobs + * @param conf Config passed to all jobs to be submitted + * @param startFlag Latch released from main to start pipeline + * @throws java.io.IOException + */ + public StressJobFactory( + JobSubmitter submitter, JobStoryProducer jobProducer, Path scratch, + Configuration conf, CountDownLatch startFlag) throws IOException { + super( + submitter, jobProducer, scratch, conf, startFlag); + + //Setting isOverloaded as true , now JF would wait for atleast first + //set of ClusterStats based on which it can decide how many job it has + //to submit. + this.loadStatus.isOverloaded = true; + } + + public Thread createReaderThread() { + return new StressReaderThread("StressJobFactory"); + } + + /* + * Worker thread responsible for reading descriptions, assigning sequence + * numbers, and normalizing time. + */ + private class StressReaderThread extends Thread { + + public StressReaderThread(String name) { + super(name); + } + + /** + * STRESS: Submits the job in STRESS mode. + * while(JT is overloaded) { + * wait(); + * } + * If not overloaded , get number of slots available. + * Keep submitting the jobs till ,total jobs is sufficient to + * load the JT. + * That is submit (Sigma(no of maps/Job)) > (2 * no of slots available) + */ + public void run() { + try { + startFlag.await(); + if (Thread.currentThread().isInterrupted()) { + return; + } + LOG.info("START STRESS @ " + System.currentTimeMillis()); + while (!Thread.currentThread().isInterrupted()) { + lock.lock(); + try { + while (loadStatus.isOverloaded) { + //Wait while JT is overloaded. + try { + overloaded.await(); + } catch (InterruptedException ie) { + return; + } + } + + int noOfSlotsAvailable = loadStatus.numSlotsBackfill; + LOG.info(" No of slots to be backfilled are " + noOfSlotsAvailable); + + for (int i = 0; i < noOfSlotsAvailable; i++) { + try { + final JobStory job = getNextJobFiltered(); + if (null == job) { + return; + } + //TODO: We need to take care of scenario when one map takes more + //than 1 slot. + i += job.getNumberMaps(); + + submitter.add( + new GridmixJob( + conf, 0L, job, scratch, sequence.getAndIncrement())); + } catch (IOException e) { + LOG.error(" EXCEPTOIN in availableSlots ", e); + error = e; + return; + } + + } + } finally { + lock.unlock(); + } + } + } catch (InterruptedException e) { + return; + } finally { + IOUtils.cleanup(null, jobProducer); + } + } + } + + /** + *

+ * STRESS Once you get the notification from StatsCollector.Collect the + * clustermetrics. Update current loadStatus with new load status of JT. + * + * @param item + */ + @Override + public void update(Statistics.ClusterStats item) { + lock.lock(); + try { + ClusterMetrics clusterMetrics = item.getStatus(); + LoadStatus newStatus; + runningWaitingJobs = item.getRunningWaitingJobs(); + newStatus = checkLoadAndGetSlotsToBackfill(clusterMetrics); + loadStatus.isOverloaded = newStatus.isOverloaded; + loadStatus.numSlotsBackfill = newStatus.numSlotsBackfill; + overloaded.signalAll(); + } finally { + lock.unlock(); + } + } + + /** + * We try to use some light-weight mechanism to determine cluster load. + * + * @param clusterStatus + * @return Whether, from job client perspective, the cluster is overloaded. + */ + private LoadStatus checkLoadAndGetSlotsToBackfill( + ClusterMetrics clusterStatus) { + LoadStatus loadStatus = new LoadStatus(); + // If there are more jobs than number of task trackers, we assume the + // cluster is overloaded. This is to bound the memory usage of the + // simulator job tracker, in situations where we have jobs with small + // number of map tasks and large number of reduce tasks. + if (runningWaitingJobs.size() >= clusterStatus.getTaskTrackerCount()) { + if (LOG.isDebugEnabled()) { + LOG.debug( + System.currentTimeMillis() + " Overloaded is " + + Boolean.TRUE.toString() + " #runningJobs >= taskTrackerCount (" + + runningWaitingJobs.size() + " >= " + + clusterStatus.getTaskTrackerCount() + " )\n"); + } + loadStatus.isOverloaded = true; + loadStatus.numSlotsBackfill = 0; + return loadStatus; + } + + float incompleteMapTasks = 0; // include pending & running map tasks. + for (Job job : runningWaitingJobs) { + try{ + incompleteMapTasks += (1 - Math.min( + job.getStatus().getMapProgress(), 1.0)) * + ((JobConf) job.getConfiguration()).getNumMapTasks(); + }catch(IOException io) { + //There might be issues with this current job + //try others + LOG.error(" Error while calculating load " , io); + continue; + }catch(InterruptedException ie) { + //There might be issues with this current job + //try others + LOG.error(" Error while calculating load " , ie); + continue; + } + } + + float overloadedThreshold = + OVERLAOD_MAPTASK_MAPSLOT_RATIO * clusterStatus.getMapSlotCapacity(); + boolean overloaded = incompleteMapTasks > overloadedThreshold; + String relOp = (overloaded) ? ">" : "<="; + if (LOG.isDebugEnabled()) { + LOG.info( + System.currentTimeMillis() + " Overloaded is " + Boolean.toString( + overloaded) + " incompleteMapTasks " + relOp + " " + + OVERLAOD_MAPTASK_MAPSLOT_RATIO + "*mapSlotCapacity" + "(" + + incompleteMapTasks + " " + relOp + " " + + OVERLAOD_MAPTASK_MAPSLOT_RATIO + "*" + + clusterStatus.getMapSlotCapacity() + ")"); + } + if (overloaded) { + loadStatus.isOverloaded = true; + loadStatus.numSlotsBackfill = 0; + } else { + loadStatus.isOverloaded = false; + loadStatus.numSlotsBackfill = + (int) (overloadedThreshold - incompleteMapTasks); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Current load Status is " + loadStatus); + } + return loadStatus; + } + + static class LoadStatus { + volatile boolean isOverloaded = false; + volatile int numSlotsBackfill = -1; + + public String toString() { + return " is Overloaded " + isOverloaded + " no of slots available " + + numSlotsBackfill; + } + } + + /** + * Start the reader thread, wait for latch if necessary. + */ + @Override + public void start() { + LOG.info(" Starting Stress submission "); + this.rThread.start(); + } + +} Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=919277&r1=919276&r2=919277&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java (original) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java Fri Mar 5 03:11:56 2010 @@ -17,261 +17,90 @@ */ package org.apache.hadoop.mapred.gridmix; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.tools.rumen.JobStory; -import org.apache.hadoop.tools.rumen.JobStoryProducer; -import org.apache.hadoop.tools.rumen.TaskAttemptInfo; -import org.apache.hadoop.tools.rumen.TaskInfo; -import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; /** * Component generating random job traces for testing on a single node. */ -class DebugJobFactory extends JobFactory { - - public DebugJobFactory(JobSubmitter submitter, Path scratch, int numJobs, - Configuration conf, CountDownLatch startFlag) throws IOException { - super(submitter, new DebugJobProducer(numJobs, conf), scratch, conf, - startFlag); - } +class DebugJobFactory { - ArrayList getSubmitted() { - return ((DebugJobProducer)jobProducer).submitted; + interface Debuggable { + ArrayList getSubmitted(); } - private static class DebugJobProducer implements JobStoryProducer { - final ArrayList submitted; - private final Configuration conf; - private final AtomicInteger numJobs; - - public DebugJobProducer(int numJobs, Configuration conf) { - super(); - this.conf = conf; - this.numJobs = new AtomicInteger(numJobs); - this.submitted = new ArrayList(); - } + public static JobFactory getFactory( + JobSubmitter submitter, Path scratch, int numJobs, Configuration conf, + CountDownLatch startFlag) throws IOException { + GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy( + conf, GridmixJobSubmissionPolicy.STRESS); + if (policy.name().equalsIgnoreCase("REPLAY")) { + return new DebugReplayJobFactory( + submitter, scratch, numJobs, conf, startFlag); + } else if (policy.name().equalsIgnoreCase("STRESS")) { + return new DebugStressJobFactory( + submitter, scratch, numJobs, conf, startFlag); + } else if (policy.name().equalsIgnoreCase("SERIAL")) { + return new DebugSerialJobFactory( + submitter, scratch, numJobs, conf, startFlag); - @Override - public JobStory getNextJob() throws IOException { - if (numJobs.getAndDecrement() > 0) { - final MockJob ret = new MockJob(conf); - submitted.add(ret); - return ret; - } - return null; } - - @Override - public void close() { } - } - - static double[] getDistr(Random r, double mindist, int size) { - assert 0.0 <= mindist && mindist <= 1.0; - final double min = mindist / size; - final double rem = 1.0 - min * size; - final double[] tmp = new double[size]; - for (int i = 0; i < tmp.length - 1; ++i) { - tmp[i] = r.nextDouble() * rem; - } - tmp[tmp.length - 1] = rem; - Arrays.sort(tmp); - - final double[] ret = new double[size]; - ret[0] = tmp[0] + min; - for (int i = 1; i < size; ++i) { - ret[i] = tmp[i] - tmp[i-1] + min; - } - return ret; + return null; } - /** - * Generate random task data for a synthetic job. - */ - static class MockJob implements JobStory { - - static final int MIN_REC = 1 << 14; - static final int MIN_BYTES = 1 << 20; - static final int VAR_REC = 1 << 14; - static final int VAR_BYTES = 4 << 20; - static final int MAX_MAP = 5; - static final int MAX_RED = 3; - - static void initDist(Random r, double min, int[] recs, long[] bytes, - long tot_recs, long tot_bytes) { - final double[] recs_dist = getDistr(r, min, recs.length); - final double[] bytes_dist = getDistr(r, min, recs.length); - long totalbytes = 0L; - int totalrecs = 0; - for (int i = 0; i < recs.length; ++i) { - recs[i] = (int) Math.round(tot_recs * recs_dist[i]); - bytes[i] = Math.round(tot_bytes * bytes_dist[i]); - totalrecs += recs[i]; - totalbytes += bytes[i]; - } - // Add/remove excess - recs[0] += totalrecs - tot_recs; - bytes[0] += totalbytes - tot_bytes; - if (LOG.isInfoEnabled()) { - LOG.info("DIST: " + Arrays.toString(recs) + " " + - tot_recs + "/" + totalrecs + " " + - Arrays.toString(bytes) + " " + tot_bytes + "/" + totalbytes); - } - } - - private static final AtomicInteger seq = new AtomicInteger(0); - // set timestamps in the past - private static final AtomicLong timestamp = - new AtomicLong(System.currentTimeMillis() - - TimeUnit.MILLISECONDS.convert(60, TimeUnit.DAYS)); - - private final int id; - private final String name; - private final int[] m_recsIn, m_recsOut, r_recsIn, r_recsOut; - private final long[] m_bytesIn, m_bytesOut, r_bytesIn, r_bytesOut; - private final long submitTime; - - public MockJob(Configuration conf) { - final Random r = new Random(); - final long seed = r.nextLong(); - r.setSeed(seed); - id = seq.getAndIncrement(); - name = String.format("MOCKJOB%05d", id); - LOG.info(name + " (" + seed + ")"); - submitTime = timestamp.addAndGet(TimeUnit.MILLISECONDS.convert( - r.nextInt(10), TimeUnit.SECONDS)); - - m_recsIn = new int[r.nextInt(MAX_MAP) + 1]; - m_bytesIn = new long[m_recsIn.length]; - m_recsOut = new int[m_recsIn.length]; - m_bytesOut = new long[m_recsIn.length]; - - r_recsIn = new int[r.nextInt(MAX_RED) + 1]; - r_bytesIn = new long[r_recsIn.length]; - r_recsOut = new int[r_recsIn.length]; - r_bytesOut = new long[r_recsIn.length]; - - // map input - final long map_recs = r.nextInt(VAR_REC) + MIN_REC; - final long map_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES; - initDist(r, 0.5, m_recsIn, m_bytesIn, map_recs, map_bytes); - - // shuffle - final long shuffle_recs = r.nextInt(VAR_REC) + MIN_REC; - final long shuffle_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES; - initDist(r, 0.4, m_recsOut, m_bytesOut, shuffle_recs, shuffle_bytes); - initDist(r, 0.8, r_recsIn, r_bytesIn, shuffle_recs, shuffle_bytes); - - // reduce output - final long red_recs = r.nextInt(VAR_REC) + MIN_REC; - final long red_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES; - initDist(r, 0.4, r_recsOut, r_bytesOut, red_recs, red_bytes); - - if (LOG.isDebugEnabled()) { - int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0; - int iMapRTotal = 0, oMapRTotal = 0, iRedRTotal = 0, oRedRTotal = 0; - for (int i = 0; i < m_recsIn.length; ++i) { - iMapRTotal += m_recsIn[i]; - iMapBTotal += m_bytesIn[i]; - oMapRTotal += m_recsOut[i]; - oMapBTotal += m_bytesOut[i]; - } - for (int i = 0; i < r_recsIn.length; ++i) { - iRedRTotal += r_recsIn[i]; - iRedBTotal += r_bytesIn[i]; - oRedRTotal += r_recsOut[i]; - oRedBTotal += r_bytesOut[i]; - } - LOG.debug(String.format("%s: M (%03d) %6d/%10d -> %6d/%10d" + - " R (%03d) %6d/%10d -> %6d/%10d @%d", name, - m_bytesIn.length, iMapRTotal, iMapBTotal, oMapRTotal, oMapBTotal, - r_bytesIn.length, iRedRTotal, iRedBTotal, oRedRTotal, oRedBTotal, - submitTime)); - } - } - - @Override - public String getName() { - return name; - } - - @Override - public String getUser() { - return "FOOBAR"; - } - - @Override - public JobID getJobID() { - return new JobID("job_mock_" + name, id); - } - - @Override - public Values getOutcome() { - return Values.SUCCESS; - } - - @Override - public long getSubmissionTime() { - return submitTime; - } - - @Override - public int getNumberMaps() { - return m_bytesIn.length; + static class DebugReplayJobFactory extends ReplayJobFactory + implements Debuggable { + public DebugReplayJobFactory( + JobSubmitter submitter, Path scratch, int numJobs, Configuration conf, + CountDownLatch startFlag) throws IOException { + super( + submitter, new DebugJobProducer(numJobs, conf), scratch, conf, + startFlag); } @Override - public int getNumberReduces() { - return r_bytesIn.length; + public ArrayList getSubmitted() { + return ((DebugJobProducer) jobProducer).submitted; } - @Override - public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) { - switch (taskType) { - case MAP: - return new TaskInfo(m_bytesIn[taskNumber], m_recsIn[taskNumber], - m_bytesOut[taskNumber], m_recsOut[taskNumber], -1); - case REDUCE: - return new TaskInfo(r_bytesIn[taskNumber], r_recsIn[taskNumber], - r_bytesOut[taskNumber], r_recsOut[taskNumber], -1); - default: - throw new IllegalArgumentException("Not interested"); - } - } + } - @Override - public InputSplit[] getInputSplits() { - throw new UnsupportedOperationException(); + static class DebugSerialJobFactory extends SerialJobFactory + implements Debuggable { + public DebugSerialJobFactory( + JobSubmitter submitter, Path scratch, int numJobs, Configuration conf, + CountDownLatch startFlag) throws IOException { + super( + submitter, new DebugJobProducer(numJobs, conf), scratch, conf, + startFlag); } @Override - public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, - int taskNumber, int taskAttemptNumber) { - throw new UnsupportedOperationException(); + public ArrayList getSubmitted() { + return ((DebugJobProducer) jobProducer).submitted; } + } - @Override - public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber, - int taskAttemptNumber, int locality) { - throw new UnsupportedOperationException(); + static class DebugStressJobFactory extends StressJobFactory + implements Debuggable { + public DebugStressJobFactory( + JobSubmitter submitter, Path scratch, int numJobs, Configuration conf, + CountDownLatch startFlag) throws IOException { + super( + submitter, new DebugJobProducer(numJobs, conf), scratch, conf, + startFlag); } @Override - public org.apache.hadoop.mapred.JobConf getJobConf() { - throw new UnsupportedOperationException(); + public ArrayList getSubmitted() { + return ((DebugJobProducer) jobProducer).submitted; } } + } Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java?rev=919277&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java (added) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java Fri Mar 5 03:11:56 2010 @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import org.apache.hadoop.tools.rumen.JobStoryProducer; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.tools.rumen.TaskInfo; +import org.apache.hadoop.tools.rumen.TaskAttemptInfo; +import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.InputSplit; + +import java.util.ArrayList; +import java.util.Random; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.TimeUnit; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + + +public class DebugJobProducer implements JobStoryProducer { + public static final Log LOG = LogFactory.getLog(DebugJobProducer.class); + final ArrayList submitted; + private final Configuration conf; + private final AtomicInteger numJobs; + + public DebugJobProducer(int numJobs, Configuration conf) { + super(); + MockJob.reset(); + this.conf = conf; + this.numJobs = new AtomicInteger(numJobs); + this.submitted = new ArrayList(); + } + + @Override + public JobStory getNextJob() throws IOException { + if (numJobs.getAndDecrement() > 0) { + final MockJob ret = new MockJob(conf); + submitted.add(ret); + return ret; + } + return null; + } + + @Override + public void close() { + } + + + static double[] getDistr(Random r, double mindist, int size) { + assert 0.0 <= mindist && mindist <= 1.0; + final double min = mindist / size; + final double rem = 1.0 - min * size; + final double[] tmp = new double[size]; + for (int i = 0; i < tmp.length - 1; ++i) { + tmp[i] = r.nextDouble() * rem; + } + tmp[tmp.length - 1] = rem; + Arrays.sort(tmp); + + final double[] ret = new double[size]; + ret[0] = tmp[0] + min; + for (int i = 1; i < size; ++i) { + ret[i] = tmp[i] - tmp[i - 1] + min; + } + return ret; + } + + + /** + * Generate random task data for a synthetic job. + */ + static class MockJob implements JobStory { + + static final int MIN_REC = 1 << 14; + static final int MIN_BYTES = 1 << 20; + static final int VAR_REC = 1 << 14; + static final int VAR_BYTES = 4 << 20; + static final int MAX_MAP = 5; + static final int MAX_RED = 3; + + static void initDist( + Random r, double min, int[] recs, long[] bytes, long tot_recs, + long tot_bytes) { + final double[] recs_dist = getDistr(r, min, recs.length); + final double[] bytes_dist = getDistr(r, min, recs.length); + long totalbytes = 0L; + int totalrecs = 0; + for (int i = 0; i < recs.length; ++i) { + recs[i] = (int) Math.round(tot_recs * recs_dist[i]); + bytes[i] = Math.round(tot_bytes * bytes_dist[i]); + totalrecs += recs[i]; + totalbytes += bytes[i]; + } + // Add/remove excess + recs[0] += totalrecs - tot_recs; + bytes[0] += totalbytes - tot_bytes; + if (LOG.isInfoEnabled()) { + LOG.info( + "DIST: " + Arrays.toString(recs) + " " + tot_recs + "/" + totalrecs + + " " + Arrays.toString(bytes) + " " + tot_bytes + "/" + totalbytes); + } + } + + private static final AtomicInteger seq = new AtomicInteger(0); + // set timestamp in the past + private static final AtomicLong timestamp = new AtomicLong( + System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert( + 60, TimeUnit.DAYS)); + + private final int id; + private final String name; + private final int[] m_recsIn, m_recsOut, r_recsIn, r_recsOut; + private final long[] m_bytesIn, m_bytesOut, r_bytesIn, r_bytesOut; + private final long submitTime; + + public MockJob(Configuration conf) { + final Random r = new Random(); + final long seed = r.nextLong(); + r.setSeed(seed); + id = seq.getAndIncrement(); + name = String.format("MOCKJOB%05d", id); + + LOG.info(name + " (" + seed + ")"); + submitTime = timestamp.addAndGet( + TimeUnit.MILLISECONDS.convert( + r.nextInt(10), TimeUnit.SECONDS)); + + m_recsIn = new int[r.nextInt(MAX_MAP) + 1]; + m_bytesIn = new long[m_recsIn.length]; + m_recsOut = new int[m_recsIn.length]; + m_bytesOut = new long[m_recsIn.length]; + + r_recsIn = new int[r.nextInt(MAX_RED) + 1]; + r_bytesIn = new long[r_recsIn.length]; + r_recsOut = new int[r_recsIn.length]; + r_bytesOut = new long[r_recsIn.length]; + + // map input + final long map_recs = r.nextInt(VAR_REC) + MIN_REC; + final long map_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES; + initDist(r, 0.5, m_recsIn, m_bytesIn, map_recs, map_bytes); + + // shuffle + final long shuffle_recs = r.nextInt(VAR_REC) + MIN_REC; + final long shuffle_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES; + initDist(r, 0.5, m_recsOut, m_bytesOut, shuffle_recs, shuffle_bytes); + initDist(r, 0.8, r_recsIn, r_bytesIn, shuffle_recs, shuffle_bytes); + + // reduce output + final long red_recs = r.nextInt(VAR_REC) + MIN_REC; + final long red_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES; + initDist(r, 0.5, r_recsOut, r_bytesOut, red_recs, red_bytes); + + if (LOG.isDebugEnabled()) { + int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0; + int iMapRTotal = 0, oMapRTotal = 0, iRedRTotal = 0, oRedRTotal = 0; + for (int i = 0; i < m_recsIn.length; ++i) { + iMapRTotal += m_recsIn[i]; + iMapBTotal += m_bytesIn[i]; + oMapRTotal += m_recsOut[i]; + oMapBTotal += m_bytesOut[i]; + } + for (int i = 0; i < r_recsIn.length; ++i) { + iRedRTotal += r_recsIn[i]; + iRedBTotal += r_bytesIn[i]; + oRedRTotal += r_recsOut[i]; + oRedBTotal += r_bytesOut[i]; + } + LOG.debug( + String.format( + "%s: M (%03d) %6d/%10d -> %6d/%10d" + + " R (%03d) %6d/%10d -> %6d/%10d @%d", name, m_bytesIn.length, + iMapRTotal, iMapBTotal, oMapRTotal, oMapBTotal, r_bytesIn.length, + iRedRTotal, iRedBTotal, oRedRTotal, oRedBTotal, submitTime)); + } + } + @Override + public String getName() { + return name; + } + + @Override + public String getUser() { + return "FOOBAR"; + } + + @Override + public JobID getJobID() { + return new JobID("job_mock_" + name, id); + } + + @Override + public Values getOutcome() { + return Values.SUCCESS; + } + + @Override + public long getSubmissionTime() { + return submitTime; + } + + @Override + public int getNumberMaps() { + return m_bytesIn.length; + } + + @Override + public int getNumberReduces() { + return r_bytesIn.length; + } + + @Override + public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) { + switch (taskType) { + case MAP: + return new TaskInfo(m_bytesIn[taskNumber], m_recsIn[taskNumber], + m_bytesOut[taskNumber], m_recsOut[taskNumber], -1); + case REDUCE: + return new TaskInfo(r_bytesIn[taskNumber], r_recsIn[taskNumber], + r_bytesOut[taskNumber], r_recsOut[taskNumber], -1); + default: + throw new IllegalArgumentException("Not interested"); + } + } + + @Override + public InputSplit[] getInputSplits() { + throw new UnsupportedOperationException(); + } + + @Override + public TaskAttemptInfo getTaskAttemptInfo( + TaskType taskType, int taskNumber, int taskAttemptNumber) { + throw new UnsupportedOperationException(); + } + + @Override + public TaskAttemptInfo getMapTaskAttemptInfoAdjusted( + int taskNumber, int taskAttemptNumber, int locality) { + throw new UnsupportedOperationException(); + } + + @Override + public org.apache.hadoop.mapred.JobConf getJobConf() { + throw new UnsupportedOperationException(); + } + + public static void reset() { + seq.set(0); + timestamp.set(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert( + 60, TimeUnit.DAYS)); + } + } +} Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=919277&r1=919276&r2=919277&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Fri Mar 5 03:11:56 2010 @@ -54,6 +54,8 @@ import org.apache.log4j.Level; public class TestGridmixSubmission { + static GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.REPLAY; + { ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.mapred.gridmix") ).getLogger().setLevel(Level.DEBUG); @@ -96,8 +98,8 @@ private final int expected; private final BlockingQueue retiredJobs; - public TestMonitor(int expected) { - super(); + public TestMonitor(int expected, Statistics stats) { + super(stats); this.expected = expected; retiredJobs = new LinkedBlockingQueue(); } @@ -276,16 +278,18 @@ static class DebugGridmix extends Gridmix { - private DebugJobFactory factory; + private JobFactory factory; private TestMonitor monitor; public void checkMonitor() throws Exception { - monitor.verify(factory.getSubmitted()); + monitor.verify(((DebugJobFactory.Debuggable)factory).getSubmitted()); } @Override - protected JobMonitor createJobMonitor() { - monitor = new TestMonitor(NJOBS + 1); // include data generation job + protected JobMonitor createJobMonitor(Statistics stats) { + Configuration conf = new Configuration(); + conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy.name()); + monitor = new TestMonitor(NJOBS + 1, stats); return monitor; } @@ -293,16 +297,42 @@ protected JobFactory createJobFactory(JobSubmitter submitter, String traceIn, Path scratchDir, Configuration conf, CountDownLatch startFlag) throws IOException { - factory = - new DebugJobFactory(submitter, scratchDir, NJOBS, conf, startFlag); + factory = DebugJobFactory.getFactory( + submitter, scratchDir, NJOBS, conf, startFlag); return factory; } } @Test - public void testSubmit() throws Exception { + public void testReplaySubmit() throws Exception { + policy = GridmixJobSubmissionPolicy.REPLAY; + System.out.println(" Replay started at " + System.currentTimeMillis()); + doSubmission(); + System.out.println(" Replay ended at " + System.currentTimeMillis()); + } + + @Test + public void testStressSubmit() throws Exception { + policy = GridmixJobSubmissionPolicy.STRESS; + System.out.println(" Stress started at " + System.currentTimeMillis()); + doSubmission(); + System.out.println(" Stress ended at " + System.currentTimeMillis()); + } + + @Test + public void testSerialSubmit() throws Exception { + policy = GridmixJobSubmissionPolicy.SERIAL; + System.out.println("Serial started at " + System.currentTimeMillis()); + doSubmission(); + System.out.println("Serial ended at " + System.currentTimeMillis()); + } + + public void doSubmission() throws Exception { final Path in = new Path("foo").makeQualified(dfs); final Path out = new Path("/gridmix").makeQualified(dfs); + final Path root = new Path("/user"); + Configuration conf = null; + try{ final String[] argv = { "-D" + FilePool.GRIDMIX_MIN_FILE + "=0", "-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out, @@ -311,11 +341,19 @@ "-" // ignored by DebugGridmix }; DebugGridmix client = new DebugGridmix(); - final Configuration conf = mrCluster.createJobConf(); + conf = mrCluster.createJobConf(); + conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY,policy.name()); //conf.setInt(Gridmix.GRIDMIX_KEY_LEN, 2); int res = ToolRunner.run(conf, client, argv); assertEquals("Client exited with nonzero status", 0, res); client.checkMonitor(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + in.getFileSystem(conf).delete(in, true); + out.getFileSystem(conf).delete(out, true); + root.getFileSystem(conf).delete(root, true); + } } }