hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r815628 [2/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/ src/contrib/gridmix/ src/contrib/gridmix/ivy/ src/contrib/gridmix/src/ src/contrib/gridmix/src/java/ src/contrib/gridmix/src/java/org/ src/contrib/gridmix/src/java/org/apache/ src/...
Date Wed, 16 Sep 2009 06:35:43 GMT
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java?rev=815628&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
Wed Sep 16 06:35:42 2009
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Component accepting submitted, running jobs and responsible for
+ * monitoring jobs for success and failure. Once a job is submitted, it is
+ * polled for status until complete. If a job is complete, then the monitor
+ * thread returns immediately to the queue. If not, the monitor will sleep
+ * for some duration.
+ */
+class JobMonitor implements Gridmix.Component<Job> {
+
+  public static final Log LOG = LogFactory.getLog(JobMonitor.class);
+
+  private final Queue<Job> mJobs;
+  private final MonitorThread mThread;
+  private final BlockingQueue<Job> runningJobs;
+  private final long pollDelayMillis;
+  private volatile boolean graceful = false;
+
+  /**
+   * Create a JobMonitor with a default polling interval of 5s.
+   */
+  public JobMonitor() {
+    this(5, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Create a JobMonitor that sleeps for the specified duration after
+   * polling a still-running job.
+   * @param pollDelaySec Delay after polling a running job
+   * @param unit Time unit for pollDelaySec (rounded to milliseconds)
+   */
+  public JobMonitor(int pollDelay, TimeUnit unit) {
+    mThread = new MonitorThread();
+    runningJobs = new LinkedBlockingQueue<Job>();
+    mJobs = new LinkedList<Job>();
+    this.pollDelayMillis = TimeUnit.MILLISECONDS.convert(pollDelay, unit);
+  }
+
+  /**
+   * Add a job to the polling queue.
+   */
+  public void add(Job job) throws InterruptedException {
+    runningJobs.put(job);
+  }
+
+  /**
+   * Temporary hook for recording job success.
+   */
+  protected void onSuccess(Job job) {
+    LOG.info(job.getJobName() + " succeeded");
+  }
+
+  /**
+   * Temporary hook for recording job failure.
+   */
+  protected void onFailure(Job job) {
+    LOG.info(job.getJobName() + " failed");
+  }
+
+  /**
+   * If shutdown before all jobs have completed, any still-running jobs
+   * may be extracted from the component.
+   * @throws IllegalStateException If monitoring thread is still running.
+   * @return Any jobs submitted and not known to have completed.
+   */
+  List<Job> getRemainingJobs() {
+    if (mThread.isAlive()) {
+      LOG.warn("Internal error: Polling running monitor for jobs");
+    }
+    synchronized (mJobs) {
+      return new ArrayList<Job>(mJobs);
+    }
+  }
+
+  /**
+   * Monitoring thread pulling running jobs from the component and into
+   * a queue to be polled for status.
+   */
+  private class MonitorThread extends Thread {
+
+    public MonitorThread() {
+      super("GridmixJobMonitor");
+    }
+
+    /**
+     * Check a job for success or failure.
+     */
+    public void process(Job job) throws IOException {
+      if (job.isSuccessful()) {
+        onSuccess(job);
+      } else {
+        onFailure(job);
+      }
+    }
+
+    @Override
+    public void run() {
+      boolean interrupted = false;
+      while (true) {
+        try {
+          synchronized (mJobs) {
+            runningJobs.drainTo(mJobs);
+          }
+
+          final boolean graceful = JobMonitor.this.graceful;
+          // shutdown conditions; either shutdown requested and all jobs
+          // have completed or abort requested and there are recently
+          // submitted jobs not yet accounted for
+          if (interrupted && ((!graceful && runningJobs.isEmpty()) ||
+                               (graceful && mJobs.isEmpty()))) {
+            break;
+          }
+          while (!mJobs.isEmpty()) {
+            Job job;
+            synchronized (mJobs) {
+              job = mJobs.poll();
+            }
+            try {
+              if (job.isComplete()) {
+                process(job);
+                continue;
+              }
+            } catch (IOException e) {
+              if (e.getCause() instanceof ClosedByInterruptException) {
+                // Job doesn't throw InterruptedException, but RPC socket layer
+                // is blocking and may throw a wrapped Exception if this thread
+                // is interrupted. Since the lower level cleared the flag,
+                // reset it here
+                Thread.currentThread().interrupt();
+              } else {
+                LOG.warn("Lost job " + job.getJobName(), e);
+                continue;
+              }
+            }
+            synchronized (mJobs) {
+              if (!mJobs.offer(job)) {
+                LOG.error("Lost job " + job.getJobName()); // should never
+                                                           // happen
+              }
+            }
+            break;
+          }
+          try {
+            TimeUnit.MILLISECONDS.sleep(pollDelayMillis);
+          } catch (InterruptedException e) {
+            interrupted = true;
+            continue;
+          }
+        } catch (Throwable e) {
+          LOG.warn("Unexpected exception: ", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Start the internal, monitoring thread.
+   */
+  public void start() {
+    mThread.start();
+  }
+
+  /**
+   * Wait for the monitor to halt, assuming shutdown or abort have been
+   * called. Note that, since submission may be sporatic, this will hang
+   * if no form of shutdown has been requested.
+   */
+  public void join() throws InterruptedException {
+    mThread.join();
+  }
+
+  /**
+   * Drain all submitted jobs to a queue and stop the monitoring thread.
+   * Upstream submitter is assumed dead.
+   */
+  public void abort() {
+    graceful = false;
+    mThread.interrupt();
+  }
+
+  /**
+   * When all monitored jobs have completed, stop the monitoring thread.
+   * Upstream submitter is assumed dead.
+   */
+  public void shutdown() {
+    graceful = true;
+    mThread.interrupt();
+  }
+}
+
+

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java?rev=815628&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
Wed Sep 16 06:35:42 2009
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Component accepting deserialized job traces, computing split data, and
+ * submitting to the cluster on deadline. Each job added from an upstream
+ * factory must be submitted to the cluster by the deadline recorded on it.
+ * Once submitted, jobs must be added to a downstream component for
+ * monitoring.
+ */
+class JobSubmitter implements Gridmix.Component<GridmixJob> {
+
+  public static final Log LOG = LogFactory.getLog(JobSubmitter.class);
+
+  final Semaphore sem;
+  private final FilePool inputDir;
+  private final JobMonitor monitor;
+  private final ExecutorService sched;
+  private volatile boolean shutdown = false;
+
+  /**
+   * Initialize the submission component with downstream monitor and pool of
+   * files from which split data may be read.
+   * @param monitor Monitor component to which jobs should be passed
+   * @param threads Number of submission threads
+   *   See {@link Gridmix#GRIDMIX_SUB_THR}.
+   * @param queueDepth Max depth of pending work queue
+   *   See {@link Gridmix#GRIDMIX_QUE_DEP}.
+   * @param inputDir Set of files from which split data may be mined for
+   *   synthetic jobs.
+   */
+  public JobSubmitter(JobMonitor monitor, int threads, int queueDepth,
+      FilePool inputDir) {
+    sem = new Semaphore(queueDepth);
+    sched = new ThreadPoolExecutor(threads, threads, 0L,
+        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+    this.inputDir = inputDir;
+    this.monitor = monitor;
+  }
+
+  /**
+   * Runnable wrapping a job to be submitted to the cluster.
+   */
+  private class SubmitTask implements Runnable {
+
+    final GridmixJob job;
+    public SubmitTask(GridmixJob job) {
+      this.job = job;
+    }
+    public void run() {
+      try {
+        // pre-compute split information
+        try {
+          job.buildSplits(inputDir);
+        } catch (IOException e) {
+          LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+          return;
+        }
+        // Sleep until deadline
+        long nsDelay = job.getDelay(TimeUnit.NANOSECONDS);
+        while (nsDelay > 0) {
+          TimeUnit.NANOSECONDS.sleep(nsDelay);
+          nsDelay = job.getDelay(TimeUnit.NANOSECONDS);
+        }
+        try {
+          // submit job
+          monitor.add(job.call());
+          LOG.debug("SUBMIT " + job + "@" + System.currentTimeMillis() +
+              " (" + job.getJob().getID() + ")");
+        } catch (IOException e) {
+          LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+        } catch (ClassNotFoundException e) {
+          LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+        }
+      } catch (InterruptedException e) {
+        // abort execution, remove splits if nesc
+        // TODO release ThdLoc
+        GridmixJob.pullDescription(job.id());
+        Thread.currentThread().interrupt();
+        return;
+      } finally {
+        sem.release();
+      }
+    }
+  }
+
+  /**
+   * Enqueue the job to be submitted per the deadline associated with it.
+   */
+  public void add(final GridmixJob job) throws InterruptedException {
+    final boolean addToQueue = !shutdown;
+    if (addToQueue) {
+      final SubmitTask task = new SubmitTask(job);
+      sem.acquire();
+      try {
+        sched.execute(task);
+      } catch (RejectedExecutionException e) {
+        sem.release();
+      }
+    }
+  }
+
+  /**
+   * (Re)scan the set of input files from which splits are derived.
+   */
+  public void refreshFilePool() throws IOException {
+    inputDir.refresh();
+  }
+
+  /**
+   * Does nothing, as the threadpool is already initialized and waiting for
+   * work from the upstream factory.
+   */
+  public void start() { }
+
+  /**
+   * Continue running until all queued jobs have been submitted to the
+   * cluster.
+   */
+  public void join() throws InterruptedException {
+    if (!shutdown) {
+      throw new IllegalStateException("Cannot wait for active submit thread");
+    }
+    sched.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Finish all jobs pending submission, but do not accept new work.
+   */
+  public void shutdown() {
+    // complete pending tasks, but accept no new tasks
+    shutdown = true;
+    sched.shutdown();
+  }
+
+  /**
+   * Discard pending work, including precomputed work waiting to be
+   * submitted.
+   */
+  public void abort() {
+    //pendingJobs.clear();
+    shutdown = true;
+    sched.shutdownNow();
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=815628&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
Wed Sep 16 06:35:42 2009
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.JobHistory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+
+/**
+ * Component generating random job traces for testing on a single node.
+ */
+class DebugJobFactory extends JobFactory {
+
+  public DebugJobFactory(JobSubmitter submitter, Path scratch, int numJobs,
+      Configuration conf, CountDownLatch startFlag) throws IOException {
+    super(submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
+        startFlag);
+  }
+
+  ArrayList<JobStory> getSubmitted() {
+    return ((DebugJobProducer)jobProducer).submitted;
+  }
+
+  private static class DebugJobProducer implements JobStoryProducer {
+    final ArrayList<JobStory> submitted;
+    private final Configuration conf;
+    private final AtomicInteger numJobs;
+
+    public DebugJobProducer(int numJobs, Configuration conf) {
+      super();
+      this.conf = conf;
+      this.numJobs = new AtomicInteger(numJobs);
+      this.submitted = new ArrayList<JobStory>();
+    }
+
+    @Override
+    public JobStory getNextJob() throws IOException {
+      if (numJobs.getAndDecrement() > 0) {
+        final MockJob ret = new MockJob(conf);
+        submitted.add(ret);
+        return ret;
+      }
+      return null;
+    }
+
+    @Override
+    public void close() { }
+  }
+
+  /**
+   * Generate random task data for a synthetic job.
+   */
+  static class MockJob implements JobStory {
+
+    public static final String MIN_BYTES_IN =   "gridmix.test.min.bytes.in";
+    public static final String VAR_BYTES_IN =   "gridmix.test.var.bytes.in";
+    public static final String MIN_BYTES_OUT =  "gridmix.test.min.bytes.out";
+    public static final String VAR_BYTES_OUT =  "gridmix.test.var.bytes.out";
+
+    public static final String MIN_REC_SIZE =   "gridmix.test.min.rec.bytes";
+    public static final String VAR_REC_SIZE =   "gridmix.test.var.rec.bytes";
+
+    public static final String MAX_MAPS =       "gridmix.test.max.maps";
+    public static final String MAX_REDS =       "gridmix.test.max.reduces";
+
+    private static final AtomicInteger seq = new AtomicInteger(0);
+    // set timestamps in the past
+    private static final AtomicLong timestamp =
+      new AtomicLong(System.currentTimeMillis() -
+        TimeUnit.MILLISECONDS.convert(60, TimeUnit.DAYS));
+
+    private final String name;
+    private final int[] m_recsIn, m_recsOut, r_recsIn, r_recsOut;
+    private final long[] m_bytesIn, m_bytesOut, r_bytesIn, r_bytesOut;
+    private final long submitTime;
+
+    public MockJob() {
+      this(new Configuration(false));
+    }
+
+    public MockJob(Configuration conf) {
+      this(conf.getInt(MIN_BYTES_IN, 1 << 20),
+           conf.getInt(VAR_BYTES_IN, 10 << 20),
+           conf.getInt(MIN_BYTES_OUT, 1 << 20),
+           conf.getInt(VAR_BYTES_OUT, 10 << 20),
+           conf.getInt(MIN_REC_SIZE , 100),
+           conf.getInt(VAR_REC_SIZE , 1 << 15),
+           conf.getInt(MAX_MAPS, 6),
+           conf.getInt(MAX_REDS, 4));
+    }
+
+    public MockJob(int min_bytes_in, int var_bytes_in,
+                   int min_bytes_out, int var_bytes_out,
+                   int min_rec_size, int var_rec_size,
+                   int max_maps, int max_reds) {
+      final Random r = new Random();
+      name = String.format("MOCKJOB%04d", seq.getAndIncrement());
+      submitTime = timestamp.addAndGet(TimeUnit.MILLISECONDS.convert(
+            r.nextInt(10), TimeUnit.SECONDS));
+      int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;
+      int iMapRTotal = 0, oMapRTotal = 0, iRedRTotal = 0, oRedRTotal = 0;
+
+      final int iAvgMapRec = r.nextInt(var_rec_size) + min_rec_size;
+      final int oAvgMapRec = r.nextInt(var_rec_size) + min_rec_size;
+
+      // MAP
+      m_bytesIn = new long[r.nextInt(max_maps) + 1];
+      m_bytesOut = new long[m_bytesIn.length];
+      m_recsIn = new int[m_bytesIn.length];
+      m_recsOut = new int[m_bytesIn.length];
+      for (int i = 0; i < m_bytesIn.length; ++i) {
+        m_bytesIn[i] = r.nextInt(var_bytes_in) + min_bytes_in;
+        iMapBTotal += m_bytesIn[i];
+        m_recsIn[i] = (int)(m_bytesIn[i] / iAvgMapRec);
+        iMapRTotal += m_recsIn[i];
+
+        m_bytesOut[i] = r.nextInt(var_bytes_out) + min_bytes_out;
+        oMapBTotal += m_bytesOut[i];
+        m_recsOut[i] = (int)(m_bytesOut[i] / oAvgMapRec);
+        oMapRTotal += m_recsOut[i];
+      }
+
+      // REDUCE
+      r_bytesIn = new long[r.nextInt(max_reds) + 1];
+      r_bytesOut = new long[r_bytesIn.length];
+      r_recsIn = new int[r_bytesIn.length];
+      r_recsOut = new int[r_bytesIn.length];
+      iRedBTotal = oMapBTotal;
+      iRedRTotal = oMapRTotal;
+      for (int j = 0; iRedBTotal > 0; ++j) {
+        int i = j % r_bytesIn.length;
+        final int inc = r.nextInt(var_bytes_out) + min_bytes_out;
+        r_bytesIn[i] += inc;
+        iRedBTotal -= inc;
+        if (iRedBTotal < 0) {
+          r_bytesIn[i] += iRedBTotal;
+          iRedBTotal = 0;
+        }
+        iRedRTotal += r_recsIn[i];
+        r_recsIn[i] = (int)(r_bytesIn[i] / oAvgMapRec);
+        iRedRTotal -= r_recsIn[i];
+        if (iRedRTotal < 0) {
+          r_recsIn[i] += iRedRTotal;
+          iRedRTotal = 0;
+        }
+
+        r_bytesOut[i] = r.nextInt(var_bytes_in) + min_bytes_in;
+        oRedBTotal += r_bytesOut[i];
+        r_recsOut[i] = (int)(r_bytesOut[i] / iAvgMapRec);
+        oRedRTotal += r_recsOut[i];
+      }
+      r_recsIn[0] += iRedRTotal;
+
+      if (LOG.isDebugEnabled()) {
+        iRedRTotal = 0;
+        iRedBTotal = 0;
+        for (int i = 0; i < r_bytesIn.length; ++i) {
+          iRedRTotal += r_recsIn[i];
+          iRedBTotal += r_bytesIn[i];
+        }
+        LOG.debug(String.format("%s: M (%03d) %6d/%10d -> %6d/%10d" +
+                             " R (%03d) %6d/%10d -> %6d/%10d @%d", name,
+            m_bytesIn.length, iMapRTotal, iMapBTotal, oMapRTotal, oMapBTotal,
+            r_bytesIn.length, iRedRTotal, iRedBTotal, oRedRTotal, oRedBTotal,
+            submitTime));
+      }
+    }
+
+    @Override
+    public String getName() {
+      return name;
+    }
+
+    @Override
+    public String getUser() {
+      return "FOOBAR";
+    }
+
+    @Override
+    public JobID getJobID() {
+      return null;
+    }
+
+    @Override
+    public JobHistory.Values getOutcome() {
+      return JobHistory.Values.SUCCESS;
+    }
+
+    @Override
+    public long getSubmissionTime() {
+      return submitTime;
+    }
+
+    @Override
+    public int getNumberMaps() {
+      return m_bytesIn.length;
+    }
+
+    @Override
+    public int getNumberReduces() {
+      return r_bytesIn.length;
+    }
+
+    @Override
+    public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+      switch (taskType) {
+        case MAP:
+          return new TaskInfo(m_bytesIn[taskNumber], m_recsIn[taskNumber],
+              m_bytesOut[taskNumber], m_recsOut[taskNumber], -1);
+        case REDUCE:
+          return new TaskInfo(r_bytesIn[taskNumber], r_recsIn[taskNumber],
+              r_bytesOut[taskNumber], r_recsOut[taskNumber], -1);
+        default:
+          throw new IllegalArgumentException("Not interested");
+      }
+    }
+
+    @Override
+    public InputSplit[] getInputSplits() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType,
+        int taskNumber, int taskAttemptNumber) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
+        int taskAttemptNumber, int locality) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public org.apache.hadoop.mapred.JobConf getJobConf() {
+      throw new UnsupportedOperationException();
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=815628&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
Wed Sep 16 06:35:42 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
+public class TestGridmixSubmission {
+  {
+    ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.mapred.gridmix")
+        ).getLogger().setLevel(Level.DEBUG);
+  }
+
+  private static FileSystem dfs = null;
+  private static MiniDFSCluster dfsCluster = null;
+  private static MiniMRCluster mrCluster = null;
+
+  private static final int NJOBS = 2;
+  private static final long GENDATA = 50; // in megabytes
+  private static final int GENSLOP = 100 * 1024; // +/- 100k for logs
+
+  @BeforeClass
+  public static void initCluster() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.job.tracker.retire.jobs", false);
+    dfsCluster = new MiniDFSCluster(conf, 3, true, null);
+    dfs = dfsCluster.getFileSystem();
+    mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1);
+  }
+
+  @AfterClass
+  public static void shutdownCluster() throws IOException {
+    if (mrCluster != null) {
+      mrCluster.shutdown();
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
+  }
+
+  static class TestMonitor extends JobMonitor {
+
+    private final int expected;
+    private final BlockingQueue<Job> retiredJobs;
+
+    public TestMonitor(int expected) {
+      super();
+      this.expected = expected;
+      retiredJobs = new LinkedBlockingQueue<Job>();
+    }
+
+    public void verify(ArrayList<JobStory> submitted) {
+      final ArrayList<Job> succeeded = new ArrayList<Job>();
+      assertEquals("Bad job count", expected, retiredJobs.drainTo(succeeded));
+    }
+
+    @Override
+    protected void onSuccess(Job job) {
+      retiredJobs.add(job);
+    }
+    @Override
+    protected void onFailure(Job job) {
+      fail("Job failure: " + job);
+    }
+  }
+
+  static class DebugGridmix extends Gridmix {
+
+    private DebugJobFactory factory;
+    private TestMonitor monitor;
+
+    public void checkMonitor() {
+      monitor.verify(factory.getSubmitted());
+    }
+
+    @Override
+    protected JobMonitor createJobMonitor() {
+      monitor = new TestMonitor(NJOBS + 1); // include data generation job
+      return monitor;
+    }
+
+    @Override
+    protected JobFactory createJobFactory(JobSubmitter submitter,
+        String traceIn, Path scratchDir, Configuration conf,
+        CountDownLatch startFlag) throws IOException {
+      factory =
+        new DebugJobFactory(submitter, scratchDir, NJOBS, conf, startFlag);
+      return factory;
+    }
+  }
+
+  @Test
+  public void testSubmit() throws Exception {
+    final Path in = new Path("foo").makeQualified(dfs);
+    final Path out = new Path("/gridmix").makeQualified(dfs);
+    final String[] argv = {
+      "-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
+      "-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out,
+      "-generate", String.valueOf(GENDATA) + "m",
+      in.toString(),
+      "-" // ignored by DebugGridmix
+    };
+    DebugGridmix client = new DebugGridmix();
+    final Configuration conf = mrCluster.createJobConf();
+    int res = ToolRunner.run(conf, client, argv);
+    assertEquals("Client exited with nonzero status", 0, res);
+    client.checkMonitor();
+    final ContentSummary generated = dfs.getContentSummary(in);
+    assertTrue("Mismatched data gen", // +/- 100k for logs
+        (GENDATA << 20) < generated.getLength() + GENSLOP ||
+        (GENDATA << 20) > generated.getLength() - GENSLOP);
+    FileStatus[] outstat = dfs.listStatus(out);
+    assertEquals("Mismatched job count", NJOBS, outstat.length);
+  }
+
+}



Mime
View raw message