hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077370 [1/2] - in /hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src: java/org/apache/hadoop/mapred/gridmix/ test/org/apache/hadoop/mapred/gridmix/
Date Fri, 04 Mar 2011 04:08:07 GMT
Author: omalley
Date: Fri Mar  4 04:08:06 2011
New Revision: 1077370

URL: http://svn.apache.org/viewvc?rev=1077370&view=rev
Log:
commit 644f4591712e66e6b31862638dad0749c02c202f
Author: Rahul Kumar Singh <rksingh@yahoo-inc.com>
Date:   Tue Apr 6 12:26:34 2010 +0530

    MAPREDUCE:1376 from https://issues.apache.org/jira/secure/attachment/12440324/1376-5-yhadoop20-100.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1376. Support for varied user submissions in Gridmix (rksingh)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StatListener.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RoundRobinUserResolver.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SubmitterUserResolver.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestUserResolve.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java?rev=1077370&r1=1077369&r2=1077370&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java Fri Mar  4 04:08:06 2011
@@ -19,16 +19,27 @@ package org.apache.hadoop.mapred.gridmix
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.ArrayList;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Echos the UGI offered.
  */
-public class EchoUserResolver extends UserResolver {
+public class EchoUserResolver implements UserResolver {
+  public static final Log LOG = LogFactory.getLog(Gridmix.class);
 
-  public EchoUserResolver() { }
+  public EchoUserResolver() {
+    LOG.info(" Current user resolver is EchoUserResolver ");
+  }
 
   public synchronized boolean setTargetUsers(URI userdesc, Configuration conf)
       throws IOException {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java?rev=1077370&r1=1077369&r2=1077370&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java Fri Mar  4 04:08:06 2011
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.OutputStream;
+import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.List;
@@ -50,10 +51,12 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
 
 // TODO can replace with form of GridmixJob
 class GenerateData extends GridmixJob {
 
+
   /**
    * Total bytes to write.
    */
@@ -94,15 +97,26 @@ class GenerateData extends GridmixJob {
   @Override
   public Job call() throws IOException, InterruptedException,
                            ClassNotFoundException {
-    job.setMapperClass(GenDataMapper.class);
-    job.setNumReduceTasks(0);
-    job.setMapOutputKeyClass(NullWritable.class);
-    job.setMapOutputValueClass(BytesWritable.class);
-    job.setInputFormatClass(GenDataFormat.class);
-    job.setOutputFormatClass(RawBytesOutputFormat.class);
-    job.setJarByClass(GenerateData.class);
-    FileInputFormat.addInputPath(job, new Path("ignored"));
-    job.submit();
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+    job = ugi.doAs( new PrivilegedExceptionAction <Job>() {
+       public Job run() throws IOException, ClassNotFoundException,
+                               InterruptedException {
+        job.setMapperClass(GenDataMapper.class);
+        job.setNumReduceTasks(0);
+        job.setMapOutputKeyClass(NullWritable.class);
+        job.setMapOutputValueClass(BytesWritable.class);
+        job.setInputFormatClass(GenDataFormat.class);
+        job.setOutputFormatClass(RawBytesOutputFormat.class);
+        job.setJarByClass(GenerateData.class);
+         try {
+           FileInputFormat.addInputPath(job, new Path("ignored"));
+         } catch (IOException e) {
+           LOG.error("Error  while adding input path ",e);
+         }
+         job.submit();
+         return job;
+      }
+    });
     return job;
   }
 
@@ -262,7 +276,7 @@ class GenerateData extends GridmixJob {
       private final int blocksize;
       private final short replicas;
       private final long maxFileBytes;
-      private final FsPermission genPerms = new FsPermission((short) 0755);
+      private final FsPermission genPerms = new FsPermission((short) 0777);
 
       private long accFileBytes = 0L;
       private long fileIdx = -1L;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=1077370&r1=1077369&r2=1077370&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Fri Mar  4 04:08:06 2011
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.net.URI;
+import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
@@ -38,6 +40,7 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.tools.rumen.ZombieJobProducer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -94,6 +97,7 @@ public class Gridmix extends Configured 
   private JobFactory factory;
   private JobSubmitter submitter;
   private JobMonitor monitor;
+  private Statistics statistics;
 
   // Shutdown hook
   private final Shutdown sdh = new Shutdown();
@@ -119,6 +123,15 @@ public class Gridmix extends Configured 
     if (!genData.getJob().isSuccessful()) {
       throw new IOException("Data generation failed!");
     }
+
+    FsShell shell = new FsShell(conf);
+    try {
+      LOG.info("Changing the permissions for inputPath " + ioPath.toString());
+      shell.run(new String[] {"-chmod","-R","777", ioPath.toString()});
+    } catch (Exception e) {
+      LOG.error("Couldnt change the file permissions " , e);
+      throw new IOException(e);
+    }
     LOG.info("Done.");
   }
 
@@ -142,46 +155,89 @@ public class Gridmix extends Configured 
   private void startThreads(Configuration conf, String traceIn, Path ioPath,
       Path scratchDir, CountDownLatch startFlag, UserResolver userResolver)
       throws IOException {
-    monitor = createJobMonitor();
-    submitter = createJobSubmitter(monitor,
-        conf.getInt(GRIDMIX_SUB_THR,
-          Runtime.getRuntime().availableProcessors() + 1),
-        conf.getInt(GRIDMIX_QUE_DEP, 5),
-        new FilePool(conf, ioPath));
-    factory = createJobFactory(submitter, traceIn, scratchDir, conf, startFlag,
-        userResolver);
-    monitor.start();
-    submitter.start();
-    factory.start();
+    try {
+      GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
+        conf, GridmixJobSubmissionPolicy.STRESS);
+      LOG.info(" Submission policy is " + policy.name());
+      statistics = new Statistics(conf, policy.getPollingInterval(), startFlag,userResolver);
+      monitor = createJobMonitor(statistics);
+      int noOfSubmitterThreads = policy.name().equals(
+        GridmixJobSubmissionPolicy.SERIAL.name()) ? 1 :
+        Runtime.getRuntime().availableProcessors() + 1;
+
+      submitter = createJobSubmitter(
+        monitor, conf.getInt(
+          GRIDMIX_SUB_THR, noOfSubmitterThreads), conf.getInt(
+          GRIDMIX_QUE_DEP, 5), new FilePool(
+          conf, ioPath), userResolver);
+      
+      factory = createJobFactory(
+        submitter, traceIn, scratchDir, conf, startFlag, userResolver);
+      if (policy.name().equals(GridmixJobSubmissionPolicy.SERIAL.name())) {
+        statistics.addJobStatsListeners(factory);
+      } else {
+        statistics.addClusterStatsObservers(factory);
+      }
+      
+      monitor.start();
+      submitter.start();
+    }catch(Exception e) {
+      LOG.error(" Exception at start " ,e);
+      throw new IOException(e);
+    }
   }
 
-  protected JobMonitor createJobMonitor() throws IOException {
-    return new JobMonitor();
+  protected JobMonitor createJobMonitor(Statistics stats) throws IOException {
+    return new JobMonitor(stats);
   }
 
   protected JobSubmitter createJobSubmitter(JobMonitor monitor, int threads,
-      int queueDepth, FilePool pool) throws IOException {
-    return new JobSubmitter(monitor, threads, queueDepth, pool);
+      int queueDepth, FilePool pool,UserResolver resolver) throws IOException {
+    return new JobSubmitter(monitor, threads, queueDepth, pool, resolver);
   }
 
-  protected JobFactory createJobFactory(JobSubmitter submitter, String traceIn,
-      Path scratchDir, Configuration conf, CountDownLatch startFlag,
-      UserResolver userResolver) throws IOException {
-    return new JobFactory(submitter, createInputStream(traceIn), scratchDir,
-        conf, startFlag, userResolver);
+  protected JobFactory createJobFactory(
+    JobSubmitter submitter, String traceIn, Path scratchDir, Configuration conf,
+    CountDownLatch startFlag, UserResolver resolver)
+    throws IOException {
+    return GridmixJobSubmissionPolicy.getPolicy(
+      conf, GridmixJobSubmissionPolicy.STRESS).createJobFactory(
+      submitter, new ZombieJobProducer(
+        createInputStream(
+          traceIn), null), scratchDir, conf, startFlag, resolver);
   }
 
   public int run(final String[] argv) throws IOException, InterruptedException {
+    int val = -1;
+    final Configuration conf = getConf();
+    UserGroupInformation.setConfiguration(conf);
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+
+    val = ugi.doAs(new PrivilegedExceptionAction<Integer>() {
+      public Integer run() throws Exception {
+        return runJob(conf,argv);
+      }
+    });
+    return val; 
+  }
+
+  private static UserResolver userResolver;
+
+  public UserResolver getCurrentUserResolver() {
+    return userResolver;
+  }
+
+  private int runJob(Configuration conf, String[] argv)
+    throws IOException, InterruptedException {
     if (argv.length < 2) {
       printUsage(System.err);
       return 1;
     }
-    final Configuration conf = getConf();
     long genbytes = -1L;
     String traceIn = null;
     Path ioPath = null;
     URI userRsrc = null;
-    final UserResolver userResolver = ReflectionUtils.newInstance(
+    userResolver = ReflectionUtils.newInstance(
         conf.getClass(GRIDMIX_USR_RSV, SubmitterUserResolver.class,
           UserResolver.class), conf);
     try {
@@ -229,6 +285,8 @@ public class Gridmix extends Configured 
         }
         // scan input dir contents
         submitter.refreshFilePool();
+        factory.start();
+        statistics.start();
       } catch (Throwable e) {
         LOG.error("Startup failed", e);
         if (factory != null) factory.abort(); // abort pipeline
@@ -236,7 +294,6 @@ public class Gridmix extends Configured 
         // signal for factory to start; sets start time
         startFlag.countDown();
       }
-
       if (factory != null) {
         // wait for input exhaustion
         factory.join(Long.MAX_VALUE);
@@ -251,6 +308,10 @@ public class Gridmix extends Configured 
         // wait for running tasks to complete
         monitor.shutdown();
         monitor.join(Long.MAX_VALUE);
+
+        statistics.shutdown();
+        statistics.join(Long.MAX_VALUE);
+
       }
     } finally {
       IOUtils.cleanup(LOG, trace);
@@ -289,6 +350,7 @@ public class Gridmix extends Configured 
         killComponent(factory, FAC_SLEEP);   // read no more tasks
         killComponent(submitter, SUB_SLEEP); // submit no more tasks
         killComponent(monitor, MON_SLEEP);   // process remaining jobs here
+        killComponent(statistics,MON_SLEEP);
       } finally {
         if (monitor == null) {
           return;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=1077370&r1=1077369&r2=1077370&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Fri Mar  4 04:08:06 2011
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
+import java.security.PrivilegedExceptionAction;
 import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -80,19 +81,40 @@ class GridmixJob implements Callable<Job
 
   private final int seq;
   private final Path outdir;
-  protected final Job job;
+  protected Job job;
   private final JobStory jobdesc;
   private final UserGroupInformation ugi;
   private final long submissionTimeNanos;
 
-  public GridmixJob(Configuration conf, long submissionMillis,
-      JobStory jobdesc, Path outRoot, UserGroupInformation ugi, int seq)
-      throws IOException {
+  public GridmixJob(
+    final Configuration conf, long submissionMillis, final JobStory jobdesc,
+    Path outRoot, UserGroupInformation ugi, final int seq) throws IOException {
     this.ugi = ugi;
-    UserGroupInformation.setCurrentUser(ugi);
-    conf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi.toString());
     ((StringBuilder)nameFormat.get().out()).setLength(JOBNAME.length());
-    job = new Job(conf, nameFormat.get().format("%05d", seq).toString());
+    try {
+      job = this.ugi.doAs(new PrivilegedExceptionAction<Job>() {
+        public Job run(){
+          try {
+            return new Job(
+              conf, nameFormat.get().format(
+                "%05d", seq).toString());
+          } catch (IOException e) {
+            LOG.error(" Could not run job submitted " + jobdesc.getName());
+            return null;
+          }
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } catch (IOException e) {
+      throw e;
+    }
+
+    if(job == null) {
+      throw new IOException(
+        " Could not create Job instance for job " + jobdesc.getName());
+    }
+    
     submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
         submissionMillis, TimeUnit.MILLISECONDS);
     this.jobdesc = jobdesc;
@@ -100,18 +122,29 @@ class GridmixJob implements Callable<Job
     outdir = new Path(outRoot, "" + seq);
   }
 
-  protected GridmixJob(Configuration conf, long submissionMillis, String name)
-      throws IOException {
-    job = new Job(conf, name);
+  protected GridmixJob(
+    final Configuration conf, long submissionMillis, final String name)
+  throws IOException {
     submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
         submissionMillis, TimeUnit.MILLISECONDS);
     jobdesc = null;
     outdir = null;
     seq = -1;
+    ugi = UserGroupInformation.getCurrentUser();
+
     try {
-      ugi = UnixUserGroupInformation.login(conf);
-    } catch (LoginException e) {
-      throw new IOException("Could not identify submitter", e);
+      job = this.ugi.doAs(new PrivilegedExceptionAction<Job>() {
+          public Job run(){
+            try {
+              return new Job(conf,name);
+            } catch (IOException e) {
+              LOG.error(" Could not run job submitted " + name);
+              return null;
+            }
+          }
+        });
+    } catch (InterruptedException e) {
+      LOG.error(" Error while creating new job " , e);
     }
   }
 
@@ -176,24 +209,49 @@ class GridmixJob implements Callable<Job
 
   public Job call() throws IOException, InterruptedException,
                            ClassNotFoundException {
-    job.setMapperClass(GridmixMapper.class);
-    job.setReducerClass(GridmixReducer.class);
-    job.setNumReduceTasks(jobdesc.getNumberReduces());
-    job.setMapOutputKeyClass(GridmixKey.class);
-    job.setMapOutputValueClass(GridmixRecord.class);
-    job.setSortComparatorClass(GridmixKey.Comparator.class);
-    job.setGroupingComparatorClass(SpecGroupingComparator.class);
-    job.setInputFormatClass(GridmixInputFormat.class);
-    job.setOutputFormatClass(RawBytesOutputFormat.class);
-    job.setPartitionerClass(DraftPartitioner.class);
-    job.setJarByClass(GridmixJob.class);
-    job.getConfiguration().setInt("gridmix.job.seq", seq);
-    job.getConfiguration().set(ORIGNAME, null == jobdesc.getJobID()
-        ? "<unknown>" : jobdesc.getJobID().toString());
-    job.getConfiguration().setBoolean("mapred.used.genericoptionsparser", true);
-    FileInputFormat.addInputPath(job, new Path("ignored"));
-    FileOutputFormat.setOutputPath(job, outdir);
-    job.submit();
+    job = ugi.doAs(
+      new PrivilegedExceptionAction<Job>() {
+        public Job run() {
+          job.setMapperClass(GridmixMapper.class);
+          job.setReducerClass(GridmixReducer.class);
+          job.setNumReduceTasks(jobdesc.getNumberReduces());
+          job.setMapOutputKeyClass(GridmixKey.class);
+          job.setMapOutputValueClass(GridmixRecord.class);
+          job.setSortComparatorClass(GridmixKey.Comparator.class);
+          job.setGroupingComparatorClass(SpecGroupingComparator.class);
+          job.setInputFormatClass(GridmixInputFormat.class);
+          job.setOutputFormatClass(RawBytesOutputFormat.class);
+          job.setPartitionerClass(DraftPartitioner.class);
+          job.setJarByClass(GridmixJob.class);
+          job.getConfiguration().setInt("gridmix.job.seq", seq);
+          job.getConfiguration().set(
+            ORIGNAME, null == jobdesc.getJobID() ? "<unknown>" :
+              jobdesc.getJobID().toString());
+          job.getConfiguration().setBoolean(
+            "mapred.used.genericoptionsparser", true);
+          try {
+            FileInputFormat.addInputPath(job, new Path("ignored"));
+          } catch (IOException e) {
+            LOG.error(" Exception while addingInpuPath job " , e);
+            return null;
+          }
+          FileOutputFormat.setOutputPath(job, outdir);
+          try {
+            job.submit();
+          } catch (IOException e) {
+            LOG.error(" Exception while submitting job " , e);
+            return null;
+          } catch (InterruptedException e) {
+            LOG.error(" Exception while submitting job " , e);
+            return null;
+          } catch (ClassNotFoundException e) {
+            LOG.error(" Exception while submitting job " , e);
+            return null;
+          }
+          return job;
+        }
+      });
+
     return job;
   }
 

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java?rev=1077370&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java Fri Mar  4 04:08:06 2011
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
+import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
+
+import java.util.concurrent.CountDownLatch;
+import java.io.IOException;
+
+enum GridmixJobSubmissionPolicy {
+
+  REPLAY("REPLAY",320000) {
+    @Override
+    public JobFactory<ClusterStats> createJobFactory(
+      JobSubmitter submitter, JobStoryProducer producer, Path scratchDir,
+      Configuration conf, CountDownLatch startFlag, UserResolver userResolver)
+      throws IOException {
+      return new ReplayJobFactory(
+        submitter, producer, scratchDir, conf, startFlag,userResolver);
+    }},
+
+  STRESS("STRESS",5000) {
+    @Override
+    public JobFactory<ClusterStats> createJobFactory(
+      JobSubmitter submitter, JobStoryProducer producer, Path scratchDir,
+      Configuration conf, CountDownLatch startFlag, UserResolver userResolver)
+      throws IOException {
+      return new StressJobFactory(
+        submitter, producer, scratchDir, conf, startFlag,userResolver);
+    }},
+
+  SERIAL("SERIAL",0) {
+    @Override
+    public JobFactory<JobStats> createJobFactory(
+      JobSubmitter submitter, JobStoryProducer producer, Path scratchDir,
+      Configuration conf, CountDownLatch startFlag, UserResolver userResolver)
+      throws IOException {
+      return new SerialJobFactory(
+        submitter, producer, scratchDir, conf, startFlag,userResolver);
+    }
+  };
+
+  public static final String JOB_SUBMISSION_POLICY =
+    "gridmix.job-submission.policy";
+
+  private final String name;
+  private final int pollingInterval;
+
+  GridmixJobSubmissionPolicy(String name,int pollingInterval) {
+    this.name = name;
+    this.pollingInterval = pollingInterval;
+  }
+
+  public abstract JobFactory createJobFactory(
+    JobSubmitter submitter, JobStoryProducer producer, Path scratchDir,
+    Configuration conf, CountDownLatch startFlag, UserResolver userResolver)
+    throws IOException;
+
+  public int getPollingInterval() {
+    return pollingInterval;
+  }
+
+  public static GridmixJobSubmissionPolicy getPolicy(
+    Configuration conf, GridmixJobSubmissionPolicy defaultPolicy) {
+    String policy = conf.get(JOB_SUBMISSION_POLICY, defaultPolicy.name());
+    return valueOf(policy.toUpperCase());
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java?rev=1077370&r1=1077369&r2=1077370&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java Fri Mar  4 04:08:06 2011
@@ -17,29 +17,28 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.tools.rumen.JobStoryProducer;
 import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
-import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
 import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
 import org.apache.hadoop.tools.rumen.TaskInfo;
 import org.apache.hadoop.tools.rumen.ZombieJobProducer;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 /**
@@ -49,20 +48,21 @@ import org.apache.commons.logging.LogFac
  * construction.
  * @see org.apache.hadoop.tools.rumen.HadoopLogsAnalyzer
  */
-class JobFactory implements Gridmix.Component<Void> {
+abstract class JobFactory<T> implements Gridmix.Component<Void>,StatListener<T>{
 
   public static final Log LOG = LogFactory.getLog(JobFactory.class);
 
-  private final Path scratch;
-  private final float rateFactor;
-  private final Configuration conf;
-  private final ReaderThread rThread;
-  private final AtomicInteger sequence;
-  private final JobSubmitter submitter;
-  private final CountDownLatch startFlag;
-  private final UserResolver userResolver;
-  private volatile IOException error = null;
+  protected final Path scratch;
+  protected final float rateFactor;
+  protected final Configuration conf;
+  protected final Thread rThread;
+  protected final AtomicInteger sequence;
+  protected final JobSubmitter submitter;
+  protected final CountDownLatch startFlag;
+  protected final UserResolver userResolver;
+  protected volatile IOException error = null;
   protected final JobStoryProducer jobProducer;
+  protected final ReentrantLock lock = new ReentrantLock(true);
 
   /**
    * Creating a new instance does not start the thread.
@@ -72,6 +72,7 @@ class JobFactory implements Gridmix.Comp
    * @param scratch Directory into which to write output from simulated jobs
    * @param conf Config passed to all jobs to be submitted
    * @param startFlag Latch released from main to start pipeline
+   * @throws java.io.IOException
    */
   public JobFactory(JobSubmitter submitter, InputStream jobTrace,
       Path scratch, Configuration conf, CountDownLatch startFlag,
@@ -98,10 +99,14 @@ class JobFactory implements Gridmix.Comp
     this.conf = new Configuration(conf);
     this.submitter = submitter;
     this.startFlag = startFlag;
-    this.rThread = new ReaderThread();
+    this.rThread = createReaderThread();
+    if(LOG.isDebugEnabled()) {
+      LOG.debug(" The submission thread name is " + rThread.getName());
+    }
     this.userResolver = userResolver;
   }
 
+
   static class MinTaskInfo extends TaskInfo {
     public MinTaskInfo(TaskInfo info) {
       super(info.getInputBytes(), info.getInputRecords(),
@@ -125,7 +130,7 @@ class JobFactory implements Gridmix.Comp
     }
   }
 
-  static class FilterJobStory implements JobStory {
+  protected static class FilterJobStory implements JobStory {
 
     protected final JobStory job;
 
@@ -157,76 +162,23 @@ class JobFactory implements Gridmix.Comp
     }
   }
 
-  /**
-   * Worker thread responsible for reading descriptions, assigning sequence
-   * numbers, and normalizing time.
-   */
-  private class ReaderThread extends Thread {
-
-    public ReaderThread() {
-      super("GridmixJobFactory");
-    }
-
-    private JobStory getNextJobFiltered() throws IOException {
-      JobStory job;
-      do {
-        job = jobProducer.getNextJob();
-      } while (job != null
-          && (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS ||
-              job.getSubmissionTime() < 0));
-      return null == job ? null : new FilterJobStory(job) {
-          @Override
-          public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
-            return new MinTaskInfo(this.job.getTaskInfo(taskType, taskNumber));
-          }
-        };
-    }
-
-    @Override
-    public void run() {
-      try {
-        startFlag.await();
-        if (Thread.currentThread().isInterrupted()) {
-          return;
-        }
-        final long initTime = TimeUnit.MILLISECONDS.convert(
-            System.nanoTime(), TimeUnit.NANOSECONDS);
-        LOG.debug("START @ " + initTime);
-        long first = -1;
-        long last = -1;
-        while (!Thread.currentThread().isInterrupted()) {
-          try {
-            final JobStory job = getNextJobFiltered();
-            if (null == job) {
-              return;
-            }
-            if (first < 0) {
-              first = job.getSubmissionTime();
-            }
-            final long current = job.getSubmissionTime();
-            if (current < last) {
-              LOG.warn("Job " + job.getJobID() + " out of order");
-              continue;
-            }
-            last = current;
-            submitter.add(new GridmixJob(new Configuration(conf), initTime +
-                  Math.round(rateFactor * (current - first)), job, scratch,
-                  userResolver.getTargetUgi(job.getUser()),
-                sequence.getAndIncrement()));
-          } catch (IOException e) {
-            JobFactory.this.error = e;
-            return;
-          }
-        }
-      } catch (InterruptedException e) {
-        // exit thread; ignore any jobs remaining in the trace
-        return;
-      } finally {
-        IOUtils.cleanup(null, jobProducer);
-      }
-    }
-  }
+  protected abstract Thread createReaderThread() ;
 
+  protected JobStory getNextJobFiltered() throws IOException {
+    JobStory job;
+    do {
+      job = jobProducer.getNextJob();
+    } while (job != null
+        && (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS ||
+            job.getSubmissionTime() < 0));
+    return null == job ? null : new FilterJobStory(job) {
+        @Override
+        public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+          return new MinTaskInfo(this.job.getTaskInfo(taskType, taskNumber));
+         }
+      };
+   }
+     
   /**
    * Obtain the error that caused the thread to exit unexpectedly.
    */

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java?rev=1077370&r1=1077369&r2=1077370&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java Fri Mar  4 04:08:06 2011
@@ -47,14 +47,12 @@ class JobMonitor implements Gridmix.Comp
   private final MonitorThread mThread;
   private final BlockingQueue<Job> runningJobs;
   private final long pollDelayMillis;
+  private Statistics statistics;
   private boolean graceful = false;
   private boolean shutdown = false;
 
-  /**
-   * Create a JobMonitor with a default polling interval of 5s.
-   */
-  public JobMonitor() {
-    this(5, TimeUnit.SECONDS);
+  public JobMonitor(Statistics statistics) {
+    this(5,TimeUnit.SECONDS, statistics);
   }
 
   /**
@@ -62,12 +60,14 @@ class JobMonitor implements Gridmix.Comp
    * polling a still-running job.
    * @param pollDelay Delay after polling a running job
    * @param unit Time unit for pollDelaySec (rounded to milliseconds)
+   * @param statistics StatCollector , listener to job completion.
    */
-  public JobMonitor(int pollDelay, TimeUnit unit) {
+  public JobMonitor(int pollDelay, TimeUnit unit, Statistics statistics) {
     mThread = new MonitorThread();
     runningJobs = new LinkedBlockingQueue<Job>();
     mJobs = new LinkedList<Job>();
     this.pollDelayMillis = TimeUnit.MILLISECONDS.convert(pollDelay, unit);
+    this.statistics = statistics;
   }
 
   /**
@@ -78,6 +78,17 @@ class JobMonitor implements Gridmix.Comp
   }
 
   /**
+   * Add a submission failed job , such tht it can be communicated
+   * back to serial.
+   * TODO: Cleaner solution for this problem
+   * @param job
+   */
+  public void submissionFailed(Job job) {
+    LOG.info(" Job submission failed notify if anyone is waiting " + job);
+    this.statistics.add(job);
+  }
+
+  /**
    * Temporary hook for recording job success.
    */
   protected void onSuccess(Job job) {
@@ -162,6 +173,7 @@ class JobMonitor implements Gridmix.Comp
             try {
               if (job.isComplete()) {
                 process(job);
+                statistics.add(job);
                 continue;
               }
             } catch (IOException e) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java?rev=1077370&r1=1077369&r2=1077370&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java Fri Mar  4 04:08:06 2011
@@ -25,6 +25,7 @@ import java.util.concurrent.RejectedExec
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,6 +48,7 @@ class JobSubmitter implements Gridmix.Co
   private final JobMonitor monitor;
   private final ExecutorService sched;
   private volatile boolean shutdown = false;
+  private final UserResolver resolver;
 
   /**
    * Initialize the submission component with downstream monitor and pool of
@@ -60,12 +62,13 @@ class JobSubmitter implements Gridmix.Co
    *   synthetic jobs.
    */
   public JobSubmitter(JobMonitor monitor, int threads, int queueDepth,
-      FilePool inputDir) {
+      FilePool inputDir, UserResolver resolver) {
     sem = new Semaphore(queueDepth);
     sched = new ThreadPoolExecutor(threads, threads, 0L,
         TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
     this.inputDir = inputDir;
     this.monitor = monitor;
+    this.resolver = resolver;
   }
 
   /**
@@ -86,6 +89,12 @@ class JobSubmitter implements Gridmix.Co
         } catch (IOException e) {
           LOG.warn("Failed to submit " + job.getJob().getJobName() + " as " +
               job.getUgi(), e);
+          monitor.submissionFailed(job.getJob());
+          return;
+        }catch (Exception e) {
+          LOG.warn("Failed to submit " + job.getJob().getJobName() + " as " +
+              job.getUgi(), e);
+          monitor.submissionFailed(job.getJob());
           return;
         }
         // Sleep until deadline
@@ -106,18 +115,24 @@ class JobSubmitter implements Gridmix.Co
             throw new InterruptedException("Failed to submit " +
                 job.getJob().getJobName());
           }
+          monitor.submissionFailed(job.getJob());
         } catch (ClassNotFoundException e) {
           LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+          monitor.submissionFailed(job.getJob());
         }
       } catch (InterruptedException e) {
         // abort execution, remove splits if nesc
         // TODO release ThdLoc
         GridmixJob.pullDescription(job.id());
         Thread.currentThread().interrupt();
-        return;
+        monitor.submissionFailed(job.getJob());
+      } catch(Exception e) {
+        //Due to some exception job wasnt submitted.
+        LOG.info(" Job " + job.getJob() + " submission failed " , e);
+        monitor.submissionFailed(job.getJob());
       } finally {
         sem.release();
-      }
+      }                               
     }
   }
 

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java?rev=1077370&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java Fri Mar  4 04:08:06 2011
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+ class ReplayJobFactory extends JobFactory<Statistics.ClusterStats> {
+  public static final Log LOG = LogFactory.getLog(ReplayJobFactory.class);
+
+  /**
+   * Creating a new instance does not start the thread.
+   *
+   * @param submitter   Component to which deserialized jobs are passed
+   * @param jobProducer Job story producer
+   *                    {@link org.apache.hadoop.tools.rumen.ZombieJobProducer}
+   * @param scratch     Directory into which to write output from simulated jobs
+   * @param conf        Config passed to all jobs to be submitted
+   * @param startFlag   Latch released from main to start pipeline
+   * @param resolver
+   * @throws java.io.IOException
+   */
+  public ReplayJobFactory(
+    JobSubmitter submitter, JobStoryProducer jobProducer, Path scratch,
+    Configuration conf, CountDownLatch startFlag, UserResolver resolver)
+    throws IOException {
+    super(submitter, jobProducer, scratch, conf, startFlag,resolver);
+  }
+
+   
+    @Override
+  public Thread createReaderThread() {
+    return new ReplayReaderThread("ReplayJobFactory");
+  }
+
+   /**
+    * @param item
+    */
+   public void update(Statistics.ClusterStats item) {
+   }
+
+   private class ReplayReaderThread extends Thread {
+
+    public ReplayReaderThread(String threadName) {
+      super(threadName);
+    }
+
+
+    public void run() {
+      try {
+        startFlag.await();
+        if (Thread.currentThread().isInterrupted()) {
+          return;
+        }
+        final long initTime = TimeUnit.MILLISECONDS.convert(
+          System.nanoTime(), TimeUnit.NANOSECONDS);
+        LOG.info("START REPLAY @ " + initTime);
+        long first = -1;
+        long last = -1;
+        while (!Thread.currentThread().isInterrupted()) {
+          try {
+            final JobStory job = getNextJobFiltered();
+            if (null == job) {
+              return;
+            }
+            if (first < 0) {
+              first = job.getSubmissionTime();
+            }
+            final long current = job.getSubmissionTime();
+            if (current < last) {
+              LOG.warn("Job " + job.getJobID() + " out of order");
+              continue;
+            }
+            last = current;
+            submitter.add(
+              new GridmixJob(
+                conf, initTime + Math.round(rateFactor * (current - first)),
+                job, scratch, userResolver.getTargetUgi(
+                  UserGroupInformation.createRemoteUser(job.getUser())),
+                sequence.getAndIncrement()));
+          } catch (IOException e) {
+            error = e;
+            return;
+          }
+        }
+      } catch (InterruptedException e) {
+        // exit thread; ignore any jobs remaining in the trace
+      } finally {
+        IOUtils.cleanup(null, jobProducer);
+      }
+    }
+  }
+
+   /**
+    * Start the reader thread, wait for latch if necessary.
+    */
+   @Override
+   public void start() {
+     this.rThread.start();
+   }
+
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RoundRobinUserResolver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RoundRobinUserResolver.java?rev=1077370&r1=1077369&r2=1077370&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RoundRobinUserResolver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RoundRobinUserResolver.java Fri Mar  4 04:08:06 2011
@@ -17,9 +17,18 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.LineReader;
+
 import java.io.IOException;
 import java.net.URI;
-import java.util.Collection;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -29,14 +38,57 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 
-public class RoundRobinUserResolver extends UserResolver {
+public class RoundRobinUserResolver implements UserResolver {
+  public static final Log LOG = LogFactory.getLog(RoundRobinUserResolver.class);
 
   private int uidx = 0;
   private List<UserGroupInformation> users = Collections.emptyList();
   private final HashMap<UserGroupInformation,UserGroupInformation> usercache =
     new HashMap<UserGroupInformation,UserGroupInformation>();
-
-  public RoundRobinUserResolver() { }
+  
+  /**
+   * Userlist assumes one UGI per line, each UGI matching
+   * &lt;username&gt;,&lt;group&gt;[,group]*
+   */
+  private List<UserGroupInformation> parseUserList(
+      URI userUri, Configuration conf) throws IOException {
+    if (null == userUri) {
+      return Collections.emptyList();
+    }
+    
+    final Path userloc = new Path(userUri.toString());
+    final Text rawUgi = new Text();
+    final FileSystem fs = userloc.getFileSystem(conf);
+    final ArrayList<UserGroupInformation> ret = new ArrayList();
+
+    LineReader in = null;
+    try {
+      final ArrayList<String> groups = new ArrayList();
+      in = new LineReader(fs.open(userloc));
+      while (in.readLine(rawUgi) > 0) {
+        int e = rawUgi.find(",");
+        if (e <= 0) {
+          throw new IOException("Missing username: " + rawUgi);
+        }
+        final String username = Text.decode(rawUgi.getBytes(), 0, e);
+        int s = e;
+        while ((e = rawUgi.find(",", ++s)) != -1) {
+          groups.add(Text.decode(rawUgi.getBytes(), s, e - s));
+          s = e;
+        }
+        groups.add(Text.decode(rawUgi.getBytes(), s, rawUgi.getLength() - s));
+        if (groups.size() == 0) {
+          throw new IOException("Missing groups: " + rawUgi);
+        }
+        ret.add(UserGroupInformation.createRemoteUser(username));
+      }
+    } finally {
+      if (in != null) {
+        in.close();
+      }
+    }
+    return ret;
+  }
 
   @Override
   public synchronized boolean setTargetUsers(URI userloc, Configuration conf)
@@ -57,7 +109,14 @@ public class RoundRobinUserResolver exte
       ret = users.get(uidx++ % users.size());
       usercache.put(ugi, ret);
     }
-    return ret;
+    UserGroupInformation val = null;
+    try {
+      val = UserGroupInformation.createProxyUser(
+        ret.getUserName(), UserGroupInformation.getLoginUser());
+    } catch (IOException e) {
+      LOG.error("Error while creating the proxy user " ,e);
+    }
+    return val;
   }
 
 }

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java?rev=1077370&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java Fri Mar  4 04:08:06 2011
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.Condition;
+
+public class SerialJobFactory extends JobFactory<JobStats> {
+
+  public static final Log LOG = LogFactory.getLog(SerialJobFactory.class);
+  private final Condition jobCompleted = lock.newCondition();
+
+  /**
+   * Creating a new instance does not start the thread.
+   *
+   * @param submitter   Component to which deserialized jobs are passed
+   * @param jobProducer Job story producer
+   *                    {@link org.apache.hadoop.tools.rumen.ZombieJobProducer}
+   * @param scratch     Directory into which to write output from simulated jobs
+   * @param conf        Config passed to all jobs to be submitted
+   * @param startFlag   Latch released from main to start pipeline
+   * @throws java.io.IOException
+   */
+  public SerialJobFactory(
+    JobSubmitter submitter, JobStoryProducer jobProducer, Path scratch,
+    Configuration conf, CountDownLatch startFlag, UserResolver resolver)
+    throws IOException {
+    super(submitter, jobProducer, scratch, conf, startFlag, resolver);
+  }
+
+  @Override
+  public Thread createReaderThread() {
+    return new SerialReaderThread("SerialJobFactory");
+  }
+
+  private class SerialReaderThread extends Thread {
+
+    public SerialReaderThread(String threadName) {
+      super(threadName);
+    }
+
+    /**
+     * SERIAL : In this scenario .  method waits on notification ,
+     * that a submitted job is actually completed. Logic is simple.
+     * ===
+     * while(true) {
+     * wait till previousjob is completed.
+     * break;
+     * }
+     * submit newJob.
+     * previousJob = newJob;
+     * ==
+     */
+    @Override
+    public void run() {
+      try {
+        startFlag.await();
+        if (Thread.currentThread().isInterrupted()) {
+          return;
+        }
+        LOG.info("START SERIAL @ " + System.currentTimeMillis());
+        GridmixJob prevJob;
+        while (!Thread.currentThread().isInterrupted()) {
+          final JobStory job;
+          try {
+            job = getNextJobFiltered();
+            if (null == job) {
+              return;
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                "Serial mode submitting job " + job.getName());
+            }
+            prevJob = new GridmixJob(
+              conf, 0L, job, scratch, userResolver.getTargetUgi(
+                UserGroupInformation.createRemoteUser(job.getUser())),
+              sequence.getAndIncrement());
+
+            lock.lock();
+            try {
+              LOG.info(" Submitted the job " + prevJob);
+              submitter.add(prevJob);
+            } finally {
+              lock.unlock();
+            }
+          } catch (IOException e) {
+            error = e;
+            //If submission of current job fails , try to submit the next job.
+            return;
+          }
+
+          if (prevJob != null) {
+            //Wait till previous job submitted is completed.
+            lock.lock();
+            try {
+              while (true) {
+                try {
+                  jobCompleted.await();
+                } catch (InterruptedException ie) {
+                  LOG.error(
+                    " Error in SerialJobFactory while waiting for job completion ",
+                    ie);
+                  return;
+                }
+                if (LOG.isDebugEnabled()) {
+                  LOG.info(" job " + job.getName() + " completed ");
+                }
+                break;
+              }
+            } finally {
+              lock.unlock();
+            }
+            prevJob = null;
+          }
+        }
+      } catch (InterruptedException e) {
+        return;
+      } finally {
+        IOUtils.cleanup(null, jobProducer);
+      }
+    }
+
+  }
+
+  /**
+   * SERIAL. Once you get notification from StatsCollector about the job
+   * completion ,simply notify the waiting thread.
+   *
+   * @param item
+   */
+  @Override
+  public void update(Statistics.JobStats item) {
+    //simply notify in case of serial submissions. We are just bothered
+    //if submitted job is completed or not.
+    lock.lock();
+    try {
+      jobCompleted.signalAll();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Start the reader thread, wait for latch if necessary.
+   */
+  @Override
+  public void start() {
+    LOG.info(" Starting Serial submission ");
+    this.rThread.start();
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StatListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StatListener.java?rev=1077370&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StatListener.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StatListener.java Fri Mar  4 04:08:06 2011
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.gridmix;
+
+/**
+ * Stat listener.
+ * @param <T>
+ */
+interface StatListener<T>{
+
+  /**
+   * 
+   * @param item
+   */
+  void update(T item);
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java?rev=1077370&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java Fri Mar  4 04:08:06 2011
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.hadoop.mapred.gridmix.Gridmix.Component;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.io.IOException;
+
+/**
+ * Component collecting the stats required by other components
+ * to make decisions.
+ * Single thread Collector tries to collec the stats.
+ * Each of thread poll updates certain datastructure(Currently ClusterStats).
+ * Components interested in these datastructure, need to register.
+ * StatsCollector notifies each of the listeners.
+ */
+public class Statistics implements Component<Job> {
+  public static final Log LOG = LogFactory.getLog(Statistics.class);
+
+  private final StatCollector statistics = new StatCollector();
+  private JobClient cluster;
+
+  //List of cluster status listeners.
+  private final List<StatListener<ClusterStats>> clusterStatlisteners =
+    new ArrayList<StatListener<ClusterStats>>();
+
+  //List of job status listeners.
+  private final List<StatListener<JobStats>> jobStatListeners =
+    new ArrayList<StatListener<JobStats>>();
+
+  private int completedJobsInCurrentInterval = 0;
+  private final int jtPollingInterval;
+  private volatile boolean shutdown = false;
+  private final int maxJobCompletedInInterval;
+  private static final String MAX_JOBS_COMPLETED_IN_POLL_INTERVAL_KEY =
+    "gridmix.max-jobs-completed-in-poll-interval";
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition jobCompleted = lock.newCondition();
+  private final CountDownLatch startFlag;
+  private final UserResolver userResolver;
+  private static Map<JobID, TaskReport[]> jobTaskReports =
+    new ConcurrentHashMap<JobID, TaskReport[]>();
+
+  public Statistics(
+    final Configuration conf, int pollingInterval, CountDownLatch startFlag,UserResolver userResolver)
+    throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+    this.userResolver = userResolver;
+    try {
+      this.cluster = ugi.doAs(new PrivilegedExceptionAction<JobClient>(){
+        public JobClient run() {
+          try {
+            return new JobClient(new JobConf(conf));
+          } catch (IOException e) {
+            LOG.error(" error while createing job client " + e.getMessage());
+          }
+          return null;
+        }
+      });
+    } catch (InterruptedException e) {
+      LOG.error(" Exception in statisitics " + e.getMessage());
+    } catch (IOException e) {
+      LOG.error("Exception in statistics " + e.getMessage());
+    }
+    this.jtPollingInterval = pollingInterval;
+    maxJobCompletedInInterval = conf.getInt(
+      MAX_JOBS_COMPLETED_IN_POLL_INTERVAL_KEY, 1);
+    this.startFlag = startFlag;
+  }
+
+  /**
+   * Used by JobMonitor to add the completed job.
+   */
+  @Override
+  public void add(Job job) {
+    //This thread will be notified initially by jobmonitor incase of
+    //data generation. Ignore that as we are getting once the input is
+    //generated.
+    if(!statistics.isAlive()) {
+      return;
+    }
+    completedJobsInCurrentInterval++;
+    if (job.getJobID() != null) {
+      jobTaskReports.remove(job.getJobID());
+    }
+    //check if we have reached the maximum level of job completions.
+    if (completedJobsInCurrentInterval >= maxJobCompletedInInterval) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+          " Reached maximum limit of jobs in a polling interval " +
+            completedJobsInCurrentInterval);
+      }
+      completedJobsInCurrentInterval = 0;
+      lock.lock();
+      try {
+        //Job is completed notify all the listeners.
+        if (jobStatListeners.size() > 0) {
+          for (StatListener<JobStats> l : jobStatListeners) {
+            JobStats stats = new JobStats();
+            stats.setCompleteJob(job);
+            l.update(stats);
+          }
+        }
+        this.jobCompleted.signalAll();
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  //TODO: We have just 2 types of listeners as of now . If no of listeners
+  //increase then we should move to map kind of model.
+
+  public void addClusterStatsObservers(StatListener<ClusterStats> listener) {
+    clusterStatlisteners.add(listener);
+  }
+
+  public void addJobStatsListeners(StatListener<JobStats> listener) {
+    this.jobStatListeners.add(listener);
+  }
+
+  /**
+   * Attempt to start the service.
+   */
+  @Override
+  public void start() {
+    statistics.start();
+  }
+
+  private class StatCollector extends Thread {
+
+    StatCollector() {
+      super("StatsCollectorThread");
+    }
+
+    public void run() {
+      try {
+        startFlag.await();
+        if (Thread.currentThread().isInterrupted()) {
+          return;
+        }
+      } catch (InterruptedException ie) {
+        LOG.error(
+          "Statistics Error while waiting for other threads to get ready ", ie);
+        return;
+      }
+      while (!shutdown) {
+        lock.lock();
+        try {
+          jobCompleted.await(jtPollingInterval, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ie) {
+          LOG.error(
+            "Statistics interrupt while waiting for polling " + ie.getCause(),
+            ie);
+          return;
+        } finally {
+          lock.unlock();
+        }
+
+        //Fetch cluster data only if required.i.e .
+        // only if there are clusterStats listener.
+        if (clusterStatlisteners.size() > 0) {
+          try {
+            ClusterStatus clusterStatus = cluster.getClusterStatus();
+            JobStatus[] allJobs = cluster.getAllJobs();
+            List<JobStatus> runningWaitingJobs = getRunningWaitingJobs(allJobs);
+            getJobReports(runningWaitingJobs);
+            updateAndNotifyClusterStatsListeners(
+              clusterStatus, runningWaitingJobs);
+          } catch (IOException e) {
+            LOG.error(
+              "Statistics io exception while polling JT ", e);
+            return;
+          } catch (InterruptedException e) {
+            LOG.error(
+              "Statistics interrupt exception while polling JT ", e);
+            return;
+          }
+        }
+      }
+    }
+
+    private void updateAndNotifyClusterStatsListeners(
+      ClusterStatus clusterStatus, List<JobStatus> runningWaitingJobs) {
+      ClusterStats stats = ClusterStats.getClusterStats();
+      stats.setClusterMetric(clusterStatus);
+      stats.setRunningWaitingJobs(runningWaitingJobs);
+      for (StatListener<ClusterStats> listener : clusterStatlisteners) {
+        listener.update(stats);
+      }
+    }
+
+    private void getJobReports(List<JobStatus> jobs) throws IOException {
+      for (final JobStatus job : jobs) {
+        UserGroupInformation user = userResolver.getTargetUgi(
+          UserGroupInformation.createRemoteUser(job.getUsername()));
+        try {
+          user.doAs(
+            new PrivilegedExceptionAction<Void>() {
+              public Void run() {
+                JobID id = job.getJobID();
+                if (!jobTaskReports.containsKey(id)) {
+                  try {
+                    jobTaskReports.put(
+                      id, cluster.getMapTaskReports(
+                        org.apache.hadoop.mapred.JobID.downgrade(id)));
+                  } catch (IOException e) {
+                    LOG.error(
+                      " Couldnt get the MapTaskResports for "+ job.getJobId());
+                  }
+                }
+                return null;
+              }
+            });
+        } catch (InterruptedException e) {
+          LOG.error(
+            " Could nt get information for user " + user + " and job " +
+              job.getJobId());
+        } catch (IOException e) {
+          LOG.error(
+            " Could nt get information for user " + user + " and job " +
+              job.getJobId());
+          throw new IOException(e);
+        }
+      }
+    }
+
+    /**
+     * From the list of Jobs , give the list of jobs whoes state is eigther
+     * PREP or RUNNING.
+     *
+     * @param allJobs
+     * @return
+     * @throws java.io.IOException
+     * @throws InterruptedException
+     */
+    private List<JobStatus> getRunningWaitingJobs(JobStatus[] allJobs)
+      throws IOException, InterruptedException {
+      List<JobStatus> result = new ArrayList<JobStatus>();
+      for (JobStatus job : allJobs) {
+        //TODO Check if job.getStatus() makes a rpc call
+        int state = job.getRunState();
+        if (JobStatus.PREP == state || JobStatus.RUNNING == state) {
+          result.add(job);
+        }
+      }
+      return result;
+    }
+
+  }
+
+  /**
+   * Wait until the service completes. It is assumed that either a
+   * {@link #shutdown} or {@link #abort} has been requested.
+   */
+  @Override
+  public void join(long millis) throws InterruptedException {
+    statistics.join(millis);
+  }
+
+  @Override
+  public void shutdown() {
+    shutdown = true;
+    jobTaskReports.clear();
+    clusterStatlisteners.clear();
+    jobStatListeners.clear();
+    statistics.interrupt();
+  }
+
+  @Override
+  public void abort() {
+    shutdown = true;
+    jobTaskReports.clear();
+    clusterStatlisteners.clear();
+    jobStatListeners.clear();
+    statistics.interrupt();
+  }
+
+  /**
+   * Class to encapsulate the JobStats information.
+   * Current we just need information about completedJob.
+   * TODO: In future we need to extend this to send more information.
+   */
+  static class JobStats {
+    private Job completedJob;
+
+    public Job getCompleteJob() {
+      return completedJob;
+    }
+
+    public void setCompleteJob(Job job) {
+      this.completedJob = job;
+    }
+  }
+
+  static class ClusterStats {
+    private ClusterStatus status = null;
+    private static ClusterStats stats = new ClusterStats();
+    private List<JobStatus> runningWaitingJobs;
+
+    private ClusterStats() {
+
+    }
+
+    /**
+     * @return stats
+     */
+    static ClusterStats getClusterStats() {
+      return stats;
+    }
+
+    /**
+     * @param metrics
+     */
+    void setClusterMetric(ClusterStatus metrics) {
+      this.status = metrics;
+    }
+
+    /**
+     * @return metrics
+     */
+    public ClusterStatus getStatus() {
+      return status;
+    }
+
+    /**
+     * @return runningWatitingJobs
+     */
+    public List<JobStatus> getRunningWaitingJobs() {
+      return runningWaitingJobs;
+    }
+
+    public void setRunningWaitingJobs(List<JobStatus> runningWaitingJobs) {
+      this.runningWaitingJobs = runningWaitingJobs;
+    }
+
+    public Map<JobID, TaskReport[]> getJobReports() {
+      return jobTaskReports;
+    }
+
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java?rev=1077370&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java Fri Mar  4 04:08:06 2011
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.Condition;
+import java.util.List;
+
+public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
+  public static final Log LOG = LogFactory.getLog(StressJobFactory.class);
+
+  private LoadStatus loadStatus = new LoadStatus();
+  private List<JobStatus> runningWaitingJobs;
+  private final Condition overloaded = this.lock.newCondition();
+  /**
+   * The minimum ratio between pending+running map tasks (aka. incomplete map
+   * tasks) and cluster map slot capacity for us to consider the cluster is
+   * overloaded. For running maps, we only count them partially. Namely, a 40%
+   * completed map is counted as 0.6 map tasks in our calculation.
+   */
+  static final float OVERLAOD_MAPTASK_MAPSLOT_RATIO = 2.0f;
+
+  /**
+   * Creating a new instance does not start the thread.
+   *
+   * @param submitter   Component to which deserialized jobs are passed
+   * @param jobProducer Stream of job traces with which to construct a
+   *                    {@link org.apache.hadoop.tools.rumen.ZombieJobProducer}
+   * @param scratch     Directory into which to write output from simulated jobs
+   * @param conf        Config passed to all jobs to be submitted
+   * @param startFlag   Latch released from main to start pipeline
+   * @throws java.io.IOException
+   */
+  public StressJobFactory(
+    JobSubmitter submitter, JobStoryProducer jobProducer, Path scratch,
+    Configuration conf, CountDownLatch startFlag, UserResolver resolver)
+    throws IOException {
+    super(
+      submitter, jobProducer, scratch, conf, startFlag, resolver);
+
+    //Setting isOverloaded as true , now JF would wait for atleast first
+    //set of ClusterStats based on which it can decide how many job it has
+    //to submit.
+    this.loadStatus.isOverloaded = true;
+  }
+
+  public Thread createReaderThread() {
+    return new StressReaderThread("StressJobFactory");
+  }
+
+  /*
+  * Worker thread responsible for reading descriptions, assigning sequence
+  * numbers, and normalizing time.
+  */
+  private class StressReaderThread extends Thread {
+
+    public StressReaderThread(String name) {
+      super(name);
+    }
+
+    /**
+     * STRESS: Submits the job in STRESS mode.
+     * while(JT is overloaded) {
+     * wait();
+     * }
+     * If not overloaded , get number of slots available.
+     * Keep submitting the jobs till ,total jobs  is sufficient to
+     * load the JT.
+     * That is submit  (Sigma(no of maps/Job)) > (2 * no of slots available)
+     */
+    public void run() {
+      try {
+        startFlag.await();
+        if (Thread.currentThread().isInterrupted()) {
+          return;
+        }
+        LOG.info("START STRESS @ " + System.currentTimeMillis());
+        while (!Thread.currentThread().isInterrupted()) {
+          lock.lock();
+          try {
+            while (loadStatus.isOverloaded) {
+              //Wait while JT is overloaded.
+              try {
+                overloaded.await();
+              } catch (InterruptedException ie) {
+                return;
+              }
+            }
+
+            int noOfSlotsAvailable = loadStatus.numSlotsBackfill;
+            LOG.info(" No of slots to be backfilled are " + noOfSlotsAvailable);
+
+            for (int i = 0; i < noOfSlotsAvailable; i++) {
+              try {
+                final JobStory job = getNextJobFiltered();
+                if (null == job) {
+                  return;
+                }
+                //TODO: We need to take care of scenario when one map takes more
+                //than 1 slot.
+                i += job.getNumberMaps();
+
+                submitter.add(
+                  new GridmixJob(
+                    conf, 0L, job, scratch, userResolver.getTargetUgi(
+                      UserGroupInformation.createRemoteUser(
+                        job.getUser())), sequence.getAndIncrement()));
+              } catch (IOException e) {
+                LOG.error(" EXCEPTOIN in availableSlots ", e);
+                error = e;
+                return;
+              }
+
+            }
+          } finally {
+            lock.unlock();
+          }
+        }
+      } catch (InterruptedException e) {
+        return;
+      } finally {
+        IOUtils.cleanup(null, jobProducer);
+      }
+    }
+  }
+
+  /**
+   * <p/>
+   * STRESS Once you get the notification from StatsCollector.Collect the
+   * clustermetrics. Update current loadStatus with new load status of JT.
+   *
+   * @param item
+   */
+  @Override
+  public void update(Statistics.ClusterStats item) {
+    lock.lock();
+    try {
+      ClusterStatus clusterMetrics = item.getStatus();
+      LoadStatus newStatus;
+      runningWaitingJobs = item.getRunningWaitingJobs();
+      newStatus = checkLoadAndGetSlotsToBackfill(item, clusterMetrics);
+      loadStatus.isOverloaded = newStatus.isOverloaded;
+      loadStatus.numSlotsBackfill = newStatus.numSlotsBackfill;
+      overloaded.signalAll();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * We try to use some light-weight mechanism to determine cluster load.
+   *
+   * @param stats
+   * @param clusterStatus
+   * @return Whether, from job client perspective, the cluster is overloaded.
+   */
+  private LoadStatus checkLoadAndGetSlotsToBackfill(
+    Statistics.ClusterStats stats, ClusterStatus clusterStatus) {
+    LoadStatus loadStatus = new LoadStatus();
+    // If there are more jobs than number of task trackers, we assume the
+    // cluster is overloaded. This is to bound the memory usage of the
+    // simulator job tracker, in situations where we have jobs with small
+    // number of map tasks and large number of reduce tasks.
+    if (runningWaitingJobs.size() >= clusterStatus.getTaskTrackers()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+          System.currentTimeMillis() + " Overloaded is " +
+            Boolean.TRUE.toString() + " #runningJobs >= taskTrackerCount (" +
+            runningWaitingJobs.size() + " >= " +
+            clusterStatus.getTaskTrackers() + " )\n");
+      }
+      loadStatus.isOverloaded = true;
+      loadStatus.numSlotsBackfill = 0;
+      return loadStatus;
+    }
+
+    float incompleteMapTasks = 0; // include pending & running map tasks.
+    for (JobStatus job : runningWaitingJobs) {
+      incompleteMapTasks += (1 - Math.min(
+        job.mapProgress(), 1.0)) * stats.getJobReports().get(
+        job.getJobID()).length;
+    }
+
+    float overloadedThreshold =
+      OVERLAOD_MAPTASK_MAPSLOT_RATIO * clusterStatus.getMaxMapTasks();
+    boolean overloaded = incompleteMapTasks > overloadedThreshold;
+    String relOp = (overloaded) ? ">" : "<=";
+    if (LOG.isDebugEnabled()) {
+      LOG.info(
+        System.currentTimeMillis() + " Overloaded is " + Boolean.toString(
+          overloaded) + " incompleteMapTasks " + relOp + " " +
+          OVERLAOD_MAPTASK_MAPSLOT_RATIO + "*mapSlotCapacity" + "(" +
+          incompleteMapTasks + " " + relOp + " " +
+          OVERLAOD_MAPTASK_MAPSLOT_RATIO + "*" +
+          clusterStatus.getMaxMapTasks() + ")");
+    }
+    if (overloaded) {
+      loadStatus.isOverloaded = true;
+      loadStatus.numSlotsBackfill = 0;
+    } else {
+      loadStatus.isOverloaded = false;
+      loadStatus.numSlotsBackfill =
+        (int) (overloadedThreshold - incompleteMapTasks);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Current load Status is " + loadStatus);
+    }
+    return loadStatus;
+  }
+
+  static class LoadStatus {
+    volatile boolean isOverloaded = false;
+    volatile int numSlotsBackfill = -1;
+
+    public String toString() {
+      return " is Overloaded " + isOverloaded + " no of slots available " +
+        numSlotsBackfill;
+    }
+  }
+
+  /**
+   * Start the reader thread, wait for latch if necessary.
+   */
+  @Override
+  public void start() {
+    LOG.info(" Starting Stress submission ");
+    this.rThread.start();
+  }
+
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SubmitterUserResolver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SubmitterUserResolver.java?rev=1077370&r1=1077369&r2=1077370&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SubmitterUserResolver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SubmitterUserResolver.java Fri Mar  4 04:08:06 2011
@@ -19,28 +19,26 @@ package org.apache.hadoop.mapred.gridmix
 
 import java.io.IOException;
 import java.net.URI;
-import javax.security.auth.login.LoginException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Resolves all UGIs to the submitting user.
  */
-public class SubmitterUserResolver extends UserResolver {
-
+public class SubmitterUserResolver implements UserResolver {
+  public static final Log LOG = LogFactory.getLog(SubmitterUserResolver.class);
+  
   private UserGroupInformation ugi = null;
 
-  public SubmitterUserResolver() { }
+  public SubmitterUserResolver() {
+    LOG.info(" Current user resolver is SubmitterUserResolver ");
+  }
 
   public synchronized boolean setTargetUsers(URI userdesc, Configuration conf)
       throws IOException {
-    try {
-      ugi = UnixUserGroupInformation.login(conf, false);
-    } catch (LoginException e) {
-      throw new IOException("Failed to get submitter UGI", e);
-    }
+    ugi = UserGroupInformation.getLoginUser();
     return false;
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java?rev=1077370&r1=1077369&r2=1077370&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java Fri Mar  4 04:08:06 2011
@@ -27,59 +27,13 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.LineReader;
 
 /**
  * Maps users in the trace to a set of valid target users on the test cluster.
  */
-public abstract class UserResolver {
-
-  /**
-   * Userlist assumes one UGI per line, each UGI matching
-   * &lt;username&gt;,&lt;group&gt;[,group]*
-   */
-  protected List<UserGroupInformation> parseUserList(
-      URI userUri, Configuration conf) throws IOException {
-    if (null == userUri) {
-      return Collections.emptyList();
-    }
-    final Path userloc = new Path(userUri.toString());
-    final Text rawUgi = new Text();
-    final FileSystem fs = userloc.getFileSystem(conf);
-    final ArrayList<UserGroupInformation> ret = new ArrayList();
-
-    LineReader in = null;
-    try {
-      final ArrayList<String> groups = new ArrayList();
-      in = new LineReader(fs.open(userloc));
-      while (in.readLine(rawUgi) > 0) {
-        int e = rawUgi.find(",");
-        if (e <= 0) {
-          throw new IOException("Missing username: " + rawUgi);
-        }
-        final String username = Text.decode(rawUgi.getBytes(), 0, e);
-        int s = e;
-        while ((e = rawUgi.find(",", ++s)) != -1) {
-          groups.add(Text.decode(rawUgi.getBytes(), s, e - s));
-          s = e;
-        }
-        groups.add(Text.decode(rawUgi.getBytes(), s, rawUgi.getLength() - s));
-        if (groups.size() == 0) {
-          throw new IOException("Missing groups: " + rawUgi);
-        }
-        ret.add(new UnixUserGroupInformation(
-              username, groups.toArray(new String[groups.size()])));
-        groups.clear();
-      }
-    } finally {
-      if (in != null) {
-        in.close();
-      }
-    }
-    return ret;
-  }
+public interface UserResolver {
 
   /**
    * Configure the user map given the URI and configuration. The resolver's
@@ -93,20 +47,13 @@ public abstract class UserResolver {
    * @return true if the resource provided was used in building the list of
    * target users
    */
-  public abstract boolean setTargetUsers(URI userdesc, Configuration conf)
+  public boolean setTargetUsers(URI userdesc, Configuration conf)
     throws IOException;
 
-  // tmp compatibility hack prior to UGI from Rumen
-  public UserGroupInformation getTargetUgi(String user)
-      throws IOException {
-    return getTargetUgi(new UnixUserGroupInformation(
-          user, new String[] { "users" }));
-  }
-
   /**
    * Map the given UGI to another per the subclass contract.
    * @param ugi User information from the trace.
    */
-  public abstract UserGroupInformation getTargetUgi(UserGroupInformation ugi);
+  public UserGroupInformation getTargetUgi(UserGroupInformation ugi);
 
 }



Mime
View raw message