hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r597602 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/jobcontrol/
Date Fri, 23 Nov 2007 08:58:30 GMT
Author: acmurthy
Date: Fri Nov 23 00:58:29 2007
New Revision: 597602

URL: http://svn.apache.org/viewvc?rev=597602&view=rev
Log:
HADOOP-2245.  Fix LocalJobRunner to generate unique mapids across jobs by incorporating the
jobid. Also adds a test-case for JobControl vis-a-vis the LocalJobRunner. Contributed by Adrian
Woodhead.

Added:
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/JobControlTestUtils.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=597602&r1=597601&r2=597602&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Nov 23 00:58:29 2007
@@ -139,6 +139,10 @@
     java.lang.ArithmeticException. The fix is to initialize bytesPerChecksum
     to 0.  (Michael Bieniosek via ddas)
 
+    HADOOP-2245.  Fix LocalJobRunner to generate unique mapids across jobs by
+    incorporating the jobid. Also adds a test-case for JobControl vis-a-vis the 
+    LocalJobRunner. (Adrian Woodhead via acmurthy)
+
 
 Release 0.15.1 - 2007-11-27
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=597602&r1=597601&r2=597602&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Nov 23 00:58:29
2007
@@ -114,7 +114,7 @@
         }
         DataOutputBuffer buffer = new DataOutputBuffer();
         for (int i = 0; i < splits.length; i++) {
-          String mapId = "map_" + idFormat.format(i); 
+          String mapId = jobId + "_map_" + idFormat.format(i);
           mapIds.add(mapId);
           buffer.reset();
           splits[i].write(buffer);

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/JobControlTestUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/JobControlTestUtils.java?rev=597602&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/JobControlTestUtils.java
(added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/JobControlTestUtils.java
Fri Nov 23 00:58:29 2007
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.jobcontrol;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Utility methods used in various Job Control unit tests.
+ */
+public class JobControlTestUtils {
+
+  static private Random rand = new Random();
+
+  private static NumberFormat idFormat = NumberFormat.getInstance();
+
+  static {
+    idFormat.setMinimumIntegerDigits(4);
+    idFormat.setGroupingUsed(false);
+  }
+
+  /**
+   * Cleans the data from the passed Path in the passed FileSystem.
+   * 
+   * @param fs FileSystem to delete data from.
+   * @param dirPath Path to be deleted.
+   * @throws IOException If an error occurs cleaning the data.
+   */
+  static void cleanData(FileSystem fs, Path dirPath) throws IOException {
+    fs.delete(dirPath);
+  }
+
+  /**
+   * Generates a string of random digits.
+   * 
+   * @return A random string.
+   */
+  private static String generateRandomWord() {
+    return idFormat.format(rand.nextLong());
+  }
+
+  /**
+   * Generates a line of random text.
+   * 
+   * @return A line of random text.
+   */
+  private static String generateRandomLine() {
+    long r = rand.nextLong() % 7;
+    long n = r + 20;
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < n; i++) {
+      sb.append(generateRandomWord()).append(" ");
+    }
+    sb.append("\n");
+    return sb.toString();
+  }
+
+  /**
+   * Generates data that can be used for Job Control tests.
+   * 
+   * @param fs FileSystem to create data in.
+   * @param dirPath Path to create the data in.
+   * @throws IOException If an error occurs creating the data.
+   */
+  static void generateData(FileSystem fs, Path dirPath) throws IOException {
+    FSDataOutputStream out = fs.create(new Path(dirPath, "data.txt"));
+    for (int i = 0; i < 10000; i++) {
+      String line = generateRandomLine();
+      out.write(line.getBytes("UTF-8"));
+    }
+    out.close();
+  }
+
+  /**
+   * Creates a simple copy job.
+   * 
+   * @param indirs List of input directories.
+   * @param outdir Output directory.
+   * @return JobConf initialised for a simple copy job.
+   * @throws Exception If an error occurs creating job configuration.
+   */
+  static JobConf createCopyJob(List indirs, Path outdir) throws Exception {
+
+    Configuration defaults = new Configuration();
+    JobConf theJob = new JobConf(defaults, TestJobControl.class);
+    theJob.setJobName("DataMoveJob");
+
+    theJob.setInputPath((Path) indirs.get(0));
+    if (indirs.size() > 1) {
+      for (int i = 1; i < indirs.size(); i++) {
+        theJob.addInputPath((Path) indirs.get(i));
+      }
+    }
+    theJob.setMapperClass(DataCopy.class);
+    theJob.setOutputPath(outdir);
+    theJob.setOutputKeyClass(Text.class);
+    theJob.setOutputValueClass(Text.class);
+    theJob.setReducerClass(DataCopy.class);
+    theJob.setNumMapTasks(12);
+    theJob.setNumReduceTasks(4);
+    return theJob;
+  }
+
+  /**
+   * Simple Mapper and Reducer implementation which copies data it reads in.
+   */
+  public static class DataCopy extends MapReduceBase implements Mapper, Reducer {
+    public void map(WritableComparable key, Writable value,
+        OutputCollector output, Reporter reporter) throws IOException {
+      output.collect(new Text(key.toString()), value);
+    }
+
+    public void reduce(WritableComparable key, Iterator values,
+        OutputCollector output, Reporter reporter) throws IOException {
+      Text dumbKey = new Text("");
+      while (values.hasNext()) {
+        Text data = (Text) values.next();
+        output.collect(dumbKey, data);
+      }
+    }
+  }
+
+}

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java?rev=597602&r1=597601&r2=597602&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java Fri
Nov 23 00:58:29 2007
@@ -139,11 +139,9 @@
    * Then it creates a JobControl object and add the 4 jobs to the JobControl object.
    * Finally, it creates a thread to run the JobControl object and monitors/reports
    * the job states.
-   * 
-   * @param args
    */
   public static void doJobControlTest() throws Exception {
-        
+
     Configuration defaults = new Configuration();
     FileSystem fs = FileSystem.get(defaults);
     Path rootDataDir = new Path(System.getProperty("test.build.data", "."), "TestJobControlData");
@@ -153,29 +151,29 @@
     Path outdir_3 = new Path(rootDataDir, "outdir_3");
     Path outdir_4 = new Path(rootDataDir, "outdir_4");
 
-    cleanData(fs, indir);
-    generateData(fs, indir);
+    JobControlTestUtils.cleanData(fs, indir);
+    JobControlTestUtils.generateData(fs, indir);
 
-    cleanData(fs, outdir_1);
-    cleanData(fs, outdir_2);
-    cleanData(fs, outdir_3);
-    cleanData(fs, outdir_4);
+    JobControlTestUtils.cleanData(fs, outdir_1);
+    JobControlTestUtils.cleanData(fs, outdir_2);
+    JobControlTestUtils.cleanData(fs, outdir_3);
+    JobControlTestUtils.cleanData(fs, outdir_4);
 
     ArrayList<Job> dependingJobs = null;
 
     ArrayList<Path> inPaths_1 = new ArrayList<Path>();
     inPaths_1.add(indir);
-    JobConf jobConf_1 = createCopyJob(inPaths_1, outdir_1);
+    JobConf jobConf_1 = JobControlTestUtils.createCopyJob(inPaths_1, outdir_1);
     Job job_1 = new Job(jobConf_1, dependingJobs);
     ArrayList<Path> inPaths_2 = new ArrayList<Path>();
     inPaths_2.add(indir);
-    JobConf jobConf_2 = createCopyJob(inPaths_2, outdir_2);
+    JobConf jobConf_2 = JobControlTestUtils.createCopyJob(inPaths_2, outdir_2);
     Job job_2 = new Job(jobConf_2, dependingJobs);
 
     ArrayList<Path> inPaths_3 = new ArrayList<Path>();
     inPaths_3.add(outdir_1);
     inPaths_3.add(outdir_2);
-    JobConf jobConf_3 = createCopyJob(inPaths_3, outdir_3);
+    JobConf jobConf_3 = JobControlTestUtils.createCopyJob(inPaths_3, outdir_3);
     dependingJobs = new ArrayList<Job>();
     dependingJobs.add(job_1);
     dependingJobs.add(job_2);
@@ -183,7 +181,7 @@
 
     ArrayList<Path> inPaths_4 = new ArrayList<Path>();
     inPaths_4.add(outdir_3);
-    JobConf jobConf_4 = createCopyJob(inPaths_4, outdir_4);
+    JobConf jobConf_4 = JobControlTestUtils.createCopyJob(inPaths_4, outdir_4);
     dependingJobs = new ArrayList<Job>();
     dependingJobs.add(job_3);
     Job job_4 = new Job(jobConf_4, dependingJobs);

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java?rev=597602&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java
(added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java
Fri Nov 23 00:58:29 2007
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.jobcontrol;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * HadoopTestCase that tests the local job runner.
+ */
+public class TestLocalJobControl extends HadoopTestCase {
+
+  public static final Log LOG = LogFactory.getLog(TestLocalJobControl.class
+      .getName());
+
+  /**
+   * Initialises a new instance of this test case to use a Local MR cluster and
+   * a local filesystem.
+   * 
+   * @throws IOException If an error occurs initialising this object.
+   */
+  public TestLocalJobControl() throws IOException {
+    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 2, 2);
+  }
+
+  /**
+   * This is a main function for testing JobControl class. It first cleans all
+   * the dirs it will use. Then it generates some random text data in
+   * TestJobControlData/indir. Then it creates 4 jobs: Job 1: copy data from
+   * indir to outdir_1 Job 2: copy data from indir to outdir_2 Job 3: copy data
+   * from outdir_1 and outdir_2 to outdir_3 Job 4: copy data from outdir to
+   * outdir_4 The jobs 1 and 2 have no dependency. The job 3 depends on jobs 1
+   * and 2. The job 4 depends on job 3.
+   * 
+   * Then it creates a JobControl object and add the 4 jobs to the JobControl
+   * object. Finally, it creates a thread to run the JobControl object and
+   * monitors/reports the job states.
+   */
+  public void testLocalJobControlDataCopy() throws Exception {
+
+    FileSystem fs = FileSystem.get(createJobConf());
+    Path rootDataDir = new Path(System.getProperty("test.build.data", "."),
+        "TestLocalJobControlData");
+    Path indir = new Path(rootDataDir, "indir");
+    Path outdir_1 = new Path(rootDataDir, "outdir_1");
+    Path outdir_2 = new Path(rootDataDir, "outdir_2");
+    Path outdir_3 = new Path(rootDataDir, "outdir_3");
+    Path outdir_4 = new Path(rootDataDir, "outdir_4");
+
+    JobControlTestUtils.cleanData(fs, indir);
+    JobControlTestUtils.generateData(fs, indir);
+
+    JobControlTestUtils.cleanData(fs, outdir_1);
+    JobControlTestUtils.cleanData(fs, outdir_2);
+    JobControlTestUtils.cleanData(fs, outdir_3);
+    JobControlTestUtils.cleanData(fs, outdir_4);
+
+    ArrayList<Job> dependingJobs = null;
+
+    ArrayList<Path> inPaths_1 = new ArrayList<Path>();
+    inPaths_1.add(indir);
+    JobConf jobConf_1 = JobControlTestUtils.createCopyJob(inPaths_1, outdir_1);
+    Job job_1 = new Job(jobConf_1, dependingJobs);
+    ArrayList<Path> inPaths_2 = new ArrayList<Path>();
+    inPaths_2.add(indir);
+    JobConf jobConf_2 = JobControlTestUtils.createCopyJob(inPaths_2, outdir_2);
+    Job job_2 = new Job(jobConf_2, dependingJobs);
+
+    ArrayList<Path> inPaths_3 = new ArrayList<Path>();
+    inPaths_3.add(outdir_1);
+    inPaths_3.add(outdir_2);
+    JobConf jobConf_3 = JobControlTestUtils.createCopyJob(inPaths_3, outdir_3);
+    dependingJobs = new ArrayList<Job>();
+    dependingJobs.add(job_1);
+    dependingJobs.add(job_2);
+    Job job_3 = new Job(jobConf_3, dependingJobs);
+
+    ArrayList<Path> inPaths_4 = new ArrayList<Path>();
+    inPaths_4.add(outdir_3);
+    JobConf jobConf_4 = JobControlTestUtils.createCopyJob(inPaths_4, outdir_4);
+    dependingJobs = new ArrayList<Job>();
+    dependingJobs.add(job_3);
+    Job job_4 = new Job(jobConf_4, dependingJobs);
+
+    JobControl theControl = new JobControl("Test");
+    theControl.addJob(job_1);
+    theControl.addJob(job_2);
+    theControl.addJob(job_3);
+    theControl.addJob(job_4);
+
+    Thread theController = new Thread(theControl);
+    theController.start();
+    while (!theControl.allFinished()) {
+      LOG.debug("Jobs in waiting state: " + theControl.getWaitingJobs().size());
+      LOG.debug("Jobs in ready state: " + theControl.getReadyJobs().size());
+      LOG.debug("Jobs in running state: " + theControl.getRunningJobs().size());
+      LOG.debug("Jobs in success state: "
+          + theControl.getSuccessfulJobs().size());
+      LOG.debug("Jobs in failed state: " + theControl.getFailedJobs().size());
+      LOG.debug("\n");
+      try {
+        Thread.sleep(5000);
+      } catch (Exception e) {
+
+      }
+    }
+
+    assertEquals("Some jobs failed", 0, theControl.getFailedJobs().size());
+    theControl.stop();
+  }
+
+}



Mime
View raw message