hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077515 [4/4] - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapreduce/ test/org/apache/hadoop/tools/rumen/ test/tools/data/rumen/small-trace-test/ test/tools/data/rumen/small-trace-test/counters-fo...
Date Fri, 04 Mar 2011 04:23:01 GMT
Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
(added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
Fri Mar  4 04:22:59 2011
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * The main driver of the Rumen Parser.
+ */
+public class TraceBuilder extends Configured implements Tool {
+  static final private Log LOG = LogFactory.getLog(TraceBuilder.class);
+
+  static final int RUN_METHOD_FAILED_EXIT_CODE = 3;
+
+  TopologyBuilder topologyBuilder = new TopologyBuilder();
+  JobConfigurationParser jobConfParser;
+  Outputter<LoggedJob> traceWriter;
+  Outputter<LoggedNetworkTopology> topologyWriter;
+
+  // Needs to be interpreted greedily or otherwise constrained
+  static final String jobIDRegex = "job_[0-9]+_[0-9]+";
+
+  // returns jobID in Capturing Group 1
+  static final Pattern confFileNameRegex =
+      Pattern.compile("[^.].+_(" + jobIDRegex
+          + ")_conf.xml(?:\\.[0-9a-zA-Z]+)?");
+
+  // This can match text that confFileNameRegex will also match. The code
+  // gives precedence to confFileNameRegex . Returns jobID
+  // in Capturing Group 1
+  static final Pattern jobFileNameRegex =
+      Pattern.compile("[^.].+_(" + jobIDRegex + ")_.+");
+
+  static class MyOptions {
+    Class<? extends InputDemuxer> inputDemuxerClass = DefaultInputDemuxer.class;
+
+    @SuppressWarnings("unchecked")
+    Class<? extends Outputter> clazzTraceOutputter = DefaultOutputter.class;
+    Path traceOutput;
+    Path topologyOutput;
+
+    List<Path> inputs = new LinkedList<Path>();
+
+    MyOptions(String[] args, Configuration conf) throws FileNotFoundException,
+        IOException, ClassNotFoundException {
+      int switchTop = 0;
+
+      while (args[switchTop].startsWith("-")) {
+        if (args[switchTop].equalsIgnoreCase("-demuxer")) {
+          inputDemuxerClass =
+              Class.forName(args[++switchTop]).asSubclass(InputDemuxer.class);
+
+          ++switchTop;
+        }
+      }
+
+      traceOutput = new Path(args[0 + switchTop]);
+      topologyOutput = new Path(args[1 + switchTop]);
+
+      for (int i = 2 + switchTop; i < args.length; ++i) {
+
+        Path thisPath = new Path(args[i]);
+
+        FileSystem fs = thisPath.getFileSystem(conf);
+        if (fs.getFileStatus(thisPath).isDir()) {
+          FileStatus[] statuses = fs.listStatus(thisPath);
+
+          List<String> dirNames = new ArrayList<String>();
+
+          for (FileStatus s : statuses) {
+            if (s.isDir()) continue;
+            String name = s.getPath().getName();
+
+            if (!(name.endsWith(".crc") || name.startsWith("."))) {
+              dirNames.add(name);
+            }
+          }
+
+          String[] sortableNames = dirNames.toArray(new String[1]);
+
+          Arrays.sort(sortableNames);
+
+          for (String dirName : sortableNames) {
+            inputs.add(new Path(thisPath, dirName));
+          }
+        } else {
+          inputs.add(thisPath);
+        }
+      }
+    }
+  }
+
+  public static void main(String[] args) {
+    TraceBuilder builder = new TraceBuilder();
+    int result = RUN_METHOD_FAILED_EXIT_CODE;
+
+    try {
+      result = ToolRunner.run(builder, args); 
+    } catch (Throwable t) {
+      t.printStackTrace(System.err);
+    } finally {
+      try {
+        builder.finish();
+      } finally {
+        if (result == 0) {
+          return;
+        }
+
+        System.exit(result);
+      }
+    }
+  }
+
+  private static String applyParser(String fileName, Pattern pattern) {
+    Matcher matcher = pattern.matcher(fileName);
+
+    if (!matcher.matches()) {
+      return null;
+    }
+
+    return matcher.group(1);
+  }
+
+  /**
+   * @param fileName
+   * @return the jobID String, parsed out of the file name. We return a valid
+   *         String for either a history log file or a config file. Otherwise,
+   *         [especially for .crc files] we return null.
+   */
+  static String extractJobID(String fileName) {
+    return applyParser(fileName, jobFileNameRegex);
+  }
+
+  static boolean isJobConfXml(String fileName, InputStream input) {
+    return applyParser(fileName, confFileNameRegex) != null;
+  }
+
+  private void addInterestedProperties(List<String> interestedProperties,
+      String[] names) {
+    for (String name : names) {
+      interestedProperties.add(name);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public int run(String[] args) throws Exception {
+    MyOptions options = new MyOptions(args, getConf());
+    List<String> interestedProperties = new ArrayList<String>();
+    {
+      for (JobConfPropertyNames candidateSet : JobConfPropertyNames.values()) {
+        addInterestedProperties(interestedProperties, candidateSet
+            .getCandidates());
+      }
+    }
+    jobConfParser = new JobConfigurationParser(interestedProperties);
+    traceWriter = options.clazzTraceOutputter.newInstance();
+    traceWriter.init(options.traceOutput, getConf());
+    topologyWriter = new DefaultOutputter<LoggedNetworkTopology>();
+    topologyWriter.init(options.topologyOutput, getConf());
+
+    try {
+      JobBuilder jobBuilder = null;
+
+      for (Path p : options.inputs) {
+        InputDemuxer inputDemuxer = options.inputDemuxerClass.newInstance();
+
+        try {
+          inputDemuxer.bindTo(p, getConf());
+        } catch (IOException e) {
+          LOG.warn("Unable to bind Path " + p + " .  Skipping...", e);
+
+          continue;
+        }
+
+        Pair<String, InputStream> filePair = null;
+
+        try {
+          while ((filePair = inputDemuxer.getNext()) != null) {
+            RewindableInputStream ris =
+                new RewindableInputStream(filePair.second());
+
+            JobHistoryParser parser = null;
+
+            try {
+              String jobID = extractJobID(filePair.first());
+              if (jobID == null) {
+                LOG.warn("File skipped: Invalid file name: "
+                    + filePair.first());
+                continue;
+              }
+              if ((jobBuilder == null)
+                  || (!jobBuilder.getJobID().equals(jobID))) {
+                if (jobBuilder != null) {
+                  traceWriter.output(jobBuilder.build());
+                }
+                jobBuilder = new JobBuilder(jobID);
+              }
+
+              if (isJobConfXml(filePair.first(), ris)) {
+                processJobConf(jobConfParser.parse(ris.rewind()), jobBuilder);
+              } else {
+                parser = JobHistoryParserFactory.getParser(ris);
+                if (parser == null) {
+                  LOG.warn("File skipped: Cannot find suitable parser: "
+                      + filePair.first());
+                } else {
+                  processJobHistory(parser, jobBuilder);
+                }
+              }
+            } finally {
+              if (parser == null) {
+                ris.close();
+              } else {
+                parser.close();
+                parser = null;
+              }
+            }
+          }
+        } catch (Throwable t) {
+          if (filePair != null) {
+            LOG.warn("TraceBuilder got an error while processing the [possibly virtual] file
"
+                + filePair.first() + " within Path " + p , t);
+          }
+        } finally {
+          inputDemuxer.close();
+        }
+      }
+      if (jobBuilder != null) {
+        traceWriter.output(jobBuilder.build());
+        jobBuilder = null;
+      } else {
+        LOG.warn("No job found in traces: ");
+      }
+
+      topologyWriter.output(topologyBuilder.build());
+    } finally {
+      traceWriter.close();
+      topologyWriter.close();
+    }
+
+    return 0;
+  }
+
+  private void processJobConf(Properties properties, JobBuilder jobBuilder) {
+    jobBuilder.process(properties);
+    topologyBuilder.process(properties);
+  }
+
+  void processJobHistory(JobHistoryParser parser, JobBuilder jobBuilder)
+      throws IOException {
+    HistoryEvent e;
+    while ((e = parser.nextEvent()) != null) {
+      jobBuilder.process(e);
+      topologyBuilder.process(e);
+    }
+
+    parser.close();
+  }
+
+  void finish() {
+    IOUtils.cleanup(LOG, traceWriter, topologyWriter);
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Version20LogInterfaceUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Version20LogInterfaceUtils.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Version20LogInterfaceUtils.java
(added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Version20LogInterfaceUtils.java
Fri Mar  4 04:22:59 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.TaskType;
+
+// This class exists to hold a bunch of static utils.  It's never instantiated.
+abstract class Version20LogInterfaceUtils {
+
+  static TaskType get20TaskType(String taskType) {
+    try {
+      return TaskType.valueOf(taskType);
+    } catch (IllegalArgumentException e) {
+      if ("CLEANUP".equals(taskType)) {
+        return TaskType.JOB_CLEANUP;
+      }
+
+      if ("SETUP".equals(taskType)) {
+        return TaskType.JOB_SETUP;
+      }
+
+      return null;
+    }
+  }
+
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java
Fri Mar  4 04:22:59 2011
@@ -45,7 +45,7 @@ public class ZombieCluster extends Abstr
    * @param defaultNode
    *          The default node setting.
    */
-  ZombieCluster(LoggedNetworkTopology topology, MachineNode defaultNode) {
+  public ZombieCluster(LoggedNetworkTopology topology, MachineNode defaultNode) {
     buildCluster(topology, defaultNode);
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
Fri Mar  4 04:22:59 2011
@@ -57,6 +57,7 @@ public class ZombieJob implements JobSto
   private JobConf jobConf;
 
   private long seed;
+  private long numRandomSeeds = 0;
   private boolean hasRandomSeed = false;
 
   private Map<LoggedDiscreteCDF, CDFRandomGenerator> interpolatorMap =
@@ -195,7 +196,8 @@ public class ZombieJob implements JobSto
         if (cluster == null) {
           splitsList.add(new FileSplit(emptyPath, 0, 0, new String[0]));
         } else {
-          MachineNode[] mNodes = cluster.getRandomMachines(avgHostPerSplit);
+          MachineNode[] mNodes = cluster.getRandomMachines(avgHostPerSplit,
+                                                           random);
           String[] hosts = new String[mNodes.length];
           for (int j = 0; j < hosts.length; ++j) {
             hosts[j] = mNodes[j].getName();
@@ -298,8 +300,8 @@ public class ZombieJob implements JobSto
   private TaskAttemptID maskAttemptID(TaskAttemptID attemptId) {
     JobID jobId = new JobID();
     TaskID taskId = attemptId.getTaskID();
-    return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(),
-        attemptId.isMap(), taskId.getId(), attemptId.getId());
+    return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), taskId
+        .isMap(), taskId.getId(), attemptId.getId());
   }
 
   private LoggedTask sanitizeLoggedTask(LoggedTask task) {
@@ -666,7 +668,7 @@ public class ZombieJob implements JobSto
   private TaskAttemptID makeTaskAttemptID(TaskType taskType, int taskNumber,
       int taskAttemptNumber) {
     return new TaskAttemptID(new TaskID(JobID.forName(job.getJobID()),
-        TaskType.MAP == taskType, taskNumber), taskAttemptNumber);
+        taskType==TaskType.MAP, taskNumber), taskAttemptNumber);
   }
   
   private TaskAttemptInfo makeUpTaskAttemptInfo(TaskType taskType, TaskInfo taskInfo,
@@ -799,7 +801,13 @@ public class ZombieJob implements JobSto
 
     return makeUpRuntimeCore(loggedDiscreteCDF);
   }
-
+  
+  private synchronized long getNextRandomSeed() {
+    numRandomSeeds++;
+    return RandomSeedGenerator.getSeed("forZombieJob" + job.getJobID(),
+                                       numRandomSeeds);
+  }
+   
   private long makeUpRuntimeCore(LoggedDiscreteCDF loggedDiscreteCDF) {
     CDFRandomGenerator interpolator;
 
@@ -814,7 +822,7 @@ public class ZombieJob implements JobSto
 
       interpolator =
           hasRandomSeed ? new CDFPiecewiseLinearRandomGenerator(
-              loggedDiscreteCDF, ++seed)
+              loggedDiscreteCDF, getNextRandomSeed())
               : new CDFPiecewiseLinearRandomGenerator(loggedDiscreteCDF);
 
       /*
@@ -867,7 +875,7 @@ public class ZombieJob implements JobSto
   }
 
   private TaskID getMaskedTaskID(TaskType taskType, int taskNumber) {
-    return new TaskID(new JobID(), TaskType.MAP == taskType, taskNumber);
+    return new TaskID(new JobID(), taskType==TaskType.MAP, taskNumber);
   }
 
   private LoggedTask getLoggedTask(TaskType taskType, int taskNumber) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java
Fri Mar  4 04:22:59 2011
@@ -30,9 +30,15 @@ public class ZombieJobProducer implement
   private final JobTraceReader reader;
   private final ZombieCluster cluster;
 
-  private ZombieJobProducer(JobTraceReader reader, ZombieCluster cluster) {
+  private boolean hasRandomSeed = false;
+  private long randomSeed = 0;
+      
+  private ZombieJobProducer(JobTraceReader reader, ZombieCluster cluster,
+      boolean hasRandomSeed, long randomSeed) {
     this.reader = reader;
     this.cluster = cluster;
+    this.hasRandomSeed = hasRandomSeed;
+    this.randomSeed = (hasRandomSeed) ? randomSeed : System.nanoTime();
   }
 
   /**
@@ -49,9 +55,29 @@ public class ZombieJobProducer implement
    */
   public ZombieJobProducer(Path path, ZombieCluster cluster, Configuration conf)
       throws IOException {
-    this(new JobTraceReader(path, conf), cluster);
+    this(new JobTraceReader(path, conf), cluster, false, -1);
   }
 
+  
+  /**
+   * Constructor
+   * 
+   * @param path
+   *          Path to the JSON trace file, possibly compressed.
+   * @param cluster
+   *          The topology of the cluster that corresponds to the jobs in the
+   *          trace. The argument can be null if we do not have knowledge of the
+   *          cluster topology.
+   * @param conf
+   * @param randomSeed
+   *          use a deterministic seed.
+   * @throws IOException
+   */
+  public ZombieJobProducer(Path path, ZombieCluster cluster,
+      Configuration conf, long randomSeed) throws IOException {
+    this(new JobTraceReader(path, conf), cluster, true, randomSeed);
+  }
+  
   /**
    * Constructor
    * 
@@ -65,13 +91,39 @@ public class ZombieJobProducer implement
    */
   public ZombieJobProducer(InputStream input, ZombieCluster cluster)
       throws IOException {
-    this(new JobTraceReader(input), cluster);
+    this(new JobTraceReader(input), cluster, false, -1);
+  }
+
+  /**
+   * Constructor
+   * 
+   * @param input
+   *          The input stream for the JSON trace.
+   * @param cluster
+   *          The topology of the cluster that corresponds to the jobs in the
+   *          trace. The argument can be null if we do not have knowledge of the
+   *          cluster topology.
+   * @param randomSeed
+   *          use a deterministic seed.
+   * @throws IOException
+   */
+  public ZombieJobProducer(InputStream input, ZombieCluster cluster,
+      long randomSeed) throws IOException {
+    this(new JobTraceReader(input), cluster, true, randomSeed);
   }
 
   @Override
   public ZombieJob getNextJob() throws IOException {
     LoggedJob job = reader.getNext();
-    return (job == null) ? null : new ZombieJob(job, cluster);
+    if (job == null) {
+      return null;
+    } else if (hasRandomSeed) {
+      long subRandomSeed = RandomSeedGenerator.getSeed(
+            "forZombieJob" + job.getJobID(), randomSeed);
+      return new ZombieJob(job, cluster, subRandomSeed);
+    } else {
+      return new ZombieJob(job, cluster);
+    }
   }
 
   @Override



Mime
View raw message