hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r812088 - in /hadoop/mapreduce/trunk: ./ src/contrib/sqoop/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapred/
Date Mon, 07 Sep 2009 11:28:43 GMT
Author: tomwhite
Date: Mon Sep  7 11:28:42 2009
New Revision: 812088

URL: http://svn.apache.org/viewvc?rev=812088&view=rev
Log:
MAPREDUCE-876. Sqoop import of large tables can time out. Contributed by Aaron Kimball.

Added:
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/AutoProgressMapRunner.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapred/
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapred/TestAutoProgressMapRunner.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=812088&r1=812087&r2=812088&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Sep  7 11:28:42 2009
@@ -292,6 +292,9 @@
     MAPREDUCE-898. Changes DistributedCache to use the new API.
     (Amareshwari Sriramadasu via ddas)
 
+    MAPREDUCE-876. Sqoop import of large tables can time out.
+    (Aaron Kimball via tomwhite)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml?rev=812088&r1=812087&r2=812088&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml Mon Sep  7 11:28:42 2009
@@ -68,5 +68,17 @@
       name="avro"
       rev="1.0.0"
       conf="common->default"/>
+    <dependency org="javax.servlet"
+      name="servlet-api"
+      rev="${servlet-api.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty"
+      rev="${jetty.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty-util"
+      rev="${jetty-util.version}"
+      conf="common->master"/>
     </dependencies>
 </ivy-module>

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/AutoProgressMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/AutoProgressMapRunner.java?rev=812088&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/AutoProgressMapRunner.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/AutoProgressMapRunner.java
Mon Sep  7 11:28:42 2009
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapRunnable;
+import org.apache.hadoop.mapred.MapRunner;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * MapRunnable implementation that spawns a background thread
+ * to periodically increment the progress of the operation.
+ * This is used because queries to the database can be expected
+ * to block for more than 10 minutes when performing the initial
+ * record input.
+ *
+ * The background thread can be configured to stop providing
+ * progress after a fixed period of time; after this time period,
+ * some other means (e.g., emitting records) must be used to sustain
+ * Mapper progress.
+ */
+public class AutoProgressMapRunner<K1, V1, K2, V2>
+    extends MapRunner<K1, V1, K2, V2> implements MapRunnable<K1, V1, K2, V2>
{
+
+  public static final Log LOG = LogFactory.getLog(AutoProgressMapRunner.class.getName());
+
+  /** Total number of millis for which progress will be reported
+      by the auto-progress thread. If this is zero, then the auto-progress
+      thread will never voluntarily exit.
+    */
+  private int maxProgressPeriod;
+
+  /** Number of milliseconds to sleep for between loop iterations. Must be less
+      than report interval.
+    */
+  private int sleepInterval;
+
+  /** Number of milliseconds between calls to Reporter.progress(). Should be a multiple
+      of the sleepInterval.
+    */
+  private int reportInterval;
+
+  public static final String MAX_PROGRESS_PERIOD_KEY = "sqoop.mapred.auto.progress.max";
+  public static final String SLEEP_INTERVAL_KEY = "sqoop.mapred.auto.progress.sleep";
+  public static final String REPORT_INTERVAL_KEY = "sqoop.mapred.auto.progress.report";
+
+  // Sleep for 10 seconds at a time.
+  static final int DEFAULT_SLEEP_INTERVAL = 10000;
+
+  // Report progress every 30 seconds.
+  static final int DEFAULT_REPORT_INTERVAL = 30000;
+
+  // Disable max progress, by default.
+  static final int DEFAULT_MAX_PROGRESS = 0;
+
+  private class ProgressThread extends Thread {
+
+    private boolean keepGoing; // while this is true, thread runs.
+    private Reporter reporter;
+    private long startTimeMillis;
+    private long lastReportMillis;
+
+    public ProgressThread(final Reporter r) {
+      this.reporter = r;
+      this.keepGoing = true;
+    }
+
+    public void signalShutdown() {
+      synchronized(this) {
+        // Synchronize this to ensure a fence before interrupt.
+        this.keepGoing = false;
+      }
+      this.interrupt();
+    }
+
+    public void run() {
+      boolean doKeepGoing = true;
+
+      this.lastReportMillis = System.currentTimeMillis();
+      this.startTimeMillis = this.lastReportMillis;
+
+      final long MAX_PROGRESS = AutoProgressMapRunner.this.maxProgressPeriod;
+      final long REPORT_INTERVAL = AutoProgressMapRunner.this.reportInterval;
+      final long SLEEP_INTERVAL = AutoProgressMapRunner.this.sleepInterval;
+
+      // in a loop:
+      //   * Check that we haven't run for too long (maxProgressPeriod)
+      //   * If it's been a report interval since we last made progress, make more.
+      //   * Sleep for a bit.
+      //   * If the parent thread has signaled for exit, do so.
+      while (doKeepGoing) {
+        long curTimeMillis = System.currentTimeMillis();
+
+        if (MAX_PROGRESS != 0 && curTimeMillis - this.startTimeMillis > MAX_PROGRESS)
{
+          synchronized(this) {
+            this.keepGoing = false;
+          }
+          LOG.info("Auto-progress thread exiting after " + MAX_PROGRESS + " ms.");
+          break;
+        }
+
+        if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) {
+          // It's been a full report interval -- claim progress.
+          LOG.debug("Auto-progress thread reporting progress");
+          this.reporter.progress();
+          this.lastReportMillis = curTimeMillis;
+        }
+
+        // Unless we got an interrupt while we were working,
+        // sleep a bit before doing more work.
+        if (!this.interrupted()) {
+          try {
+            Thread.sleep(SLEEP_INTERVAL);
+          } catch (InterruptedException ie) {
+            // we were notified on something; not necessarily an error.
+          }
+        }
+
+        synchronized(this) {
+          // Read shared field in a synchronized block.
+          doKeepGoing = this.keepGoing;
+        }
+      }
+
+      LOG.info("Auto-progress thread is finished. keepGoing=" + doKeepGoing);
+    }
+  }
+
+  public void configure(JobConf job) {
+    this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY, DEFAULT_MAX_PROGRESS);
+    this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY, DEFAULT_SLEEP_INTERVAL);
+    this.reportInterval = job.getInt(REPORT_INTERVAL_KEY, DEFAULT_REPORT_INTERVAL);
+
+    if (this.reportInterval < 1) {
+      LOG.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to " + DEFAULT_REPORT_INTERVAL);
+      this.reportInterval = DEFAULT_REPORT_INTERVAL;
+    }
+
+    if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) {
+      LOG.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to " + DEFAULT_SLEEP_INTERVAL);
+      this.sleepInterval = DEFAULT_SLEEP_INTERVAL;
+    }
+
+    if (this.maxProgressPeriod < 0) {
+      LOG.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to " + DEFAULT_MAX_PROGRESS);
+      this.maxProgressPeriod = DEFAULT_MAX_PROGRESS;
+    }
+
+    super.configure(job);
+  }
+
+  public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter)
+      throws IOException {
+
+    ProgressThread thread = this.new ProgressThread(reporter);
+
+    try {
+      thread.setDaemon(true);
+      thread.start();
+      // Use default MapRunner to actually drive the mapping.
+      super.run(input, output, reporter);
+    } finally {
+      // Tell the progress thread to exit..
+      LOG.debug("Instructing auto-progress thread to quit.");
+      thread.signalShutdown();
+      try {
+        // And wait for that to happen.
+        LOG.debug("Waiting for progress thread shutdown...");
+        thread.join();
+        LOG.debug("Progress thread shutdown detected.");
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted when waiting on auto-progress thread: " + ie.toString());
+      }
+    }
+  }
+}
+

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java?rev=812088&r1=812087&r2=812088&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
Mon Sep  7 11:28:42 2009
@@ -114,6 +114,7 @@
       job.setNumReduceTasks(0);
       job.setNumMapTasks(1);
       job.setInputFormat(DBInputFormat.class);
+      job.setMapRunnerClass(AutoProgressMapRunner.class);
 
       FileOutputFormat.setOutputPath(job, outputPath);
 
@@ -130,7 +131,7 @@
       if (null == colNames) {
         colNames = mgr.getColumnNames(tableName);
       }
-      
+
       // It's ok if the where clause is null in DBInputFormat.setInput.
       String whereClause = options.getWhereClause();
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java?rev=812088&r1=812087&r2=812088&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
Mon Sep  7 11:28:42 2009
@@ -23,6 +23,7 @@
 import org.apache.hadoop.sqoop.lib.TestRecordParser;
 import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
 import org.apache.hadoop.sqoop.manager.TestSqlManager;
+import org.apache.hadoop.sqoop.mapred.TestAutoProgressMapRunner;
 import org.apache.hadoop.sqoop.orm.TestClassWriter;
 import org.apache.hadoop.sqoop.orm.TestParseMethods;
 
@@ -41,6 +42,7 @@
   public static Test suite() {
     TestSuite suite = new TestSuite("Tests for org.apache.hadoop.sqoop");
 
+    suite.addTestSuite(TestAutoProgressMapRunner.class);
     suite.addTestSuite(TestAllTables.class);
     suite.addTestSuite(TestHsqldbManager.class);
     suite.addTestSuite(TestSqlManager.class);

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapred/TestAutoProgressMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapred/TestAutoProgressMapRunner.java?rev=812088&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapred/TestAutoProgressMapRunner.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapred/TestAutoProgressMapRunner.java
Mon Sep  7 11:28:42 2009
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.manager.ManagerFactory;
+
+import junit.framework.TestCase;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Test the AutoProgressMapRunner implementation and prove that it makes
+ * progress updates when the mapper itself isn't.
+ */
+public class TestAutoProgressMapRunner extends TestCase {
+
+  /** Parameter: how long should each map() call sleep for? */
+  public static final String MAPPER_SLEEP_INTERVAL_KEY = "sqoop.test.mapper.sleep.ival";
+
+
+  /** Mapper that just sleeps for a configurable amount of time. */
+  public static class SleepingMapper<K1, V1, K2, V2> extends MapReduceBase
+      implements Mapper<K1, V1, K2, V2> {
+
+    private int sleepInterval;
+
+    public void configure(JobConf job) {
+      this.sleepInterval = job.getInt(MAPPER_SLEEP_INTERVAL_KEY, 100);
+    }
+
+    public void map(K1 k, V1 v, OutputCollector<K2, V2> out, Reporter r) throws IOException
{
+      while (true) {
+        try {
+          Thread.sleep(this.sleepInterval);
+          break;
+        } catch (InterruptedException ie) {
+        }
+      }
+    }
+  }
+
+  /** Mapper that sleeps for 1 second then fails. */
+  public static class FailingMapper<K1, V1, K2, V2> extends MapReduceBase
+      implements Mapper<K1, V1, K2, V2> {
+
+    public void map(K1 k, V1 v, OutputCollector<K2, V2> out, Reporter r) throws IOException
{
+      throw new IOException("Causing job failure.");
+    }
+  }
+
+  private final Path inPath  = new Path("./input");
+  private final Path outPath = new Path("./output");
+
+  private MiniMRCluster mr = null;
+  private FileSystem fs = null;
+
+  private final static int NUM_NODES = 1;
+
+  public void setUp() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set("fs.default.name", "file:///");
+    fs = FileSystem.get(conf);
+    mr = new MiniMRCluster(NUM_NODES, fs.getUri().toString(), 1, null, null, new JobConf(conf));
+
+    // Create a file to use as a dummy input
+    DataOutputStream os = fs.create(new Path(inPath, "part-0"));
+    os.writeBytes("This is a line of text.");
+    os.close();
+  }
+
+  public void tearDown() throws IOException {
+    if (null != fs) {
+      fs.delete(inPath, true);
+      fs.delete(outPath, true);
+    }
+
+    if (null != mr) {
+      mr.shutdown();
+      this.mr = null;
+    }
+  }
+
+
+  /**
+   * Test that even if the mapper just sleeps, the auto-progress thread keeps it all alive
+   */
+  public void testBackgroundProgress() throws IOException {
+    // Report progress every 2.5 seconds.
+    final int REPORT_INTERVAL = 2500;
+
+    // Tasks need to report progress once every ten seconds.
+    final int TASK_KILL_TIMEOUT = 4 * REPORT_INTERVAL;
+
+    // Create and run the job.
+    JobConf job = mr.createJobConf();
+
+    // Set the task timeout to be pretty strict.
+    job.setInt("mapred.task.timeout", TASK_KILL_TIMEOUT);
+
+    // Set the mapper itself to block for long enough that it should be killed on its own.
+    job.setInt(MAPPER_SLEEP_INTERVAL_KEY, 2 * TASK_KILL_TIMEOUT);
+
+    // Report progress frequently..
+    job.setInt(AutoProgressMapRunner.SLEEP_INTERVAL_KEY, REPORT_INTERVAL);
+    job.setInt(AutoProgressMapRunner.REPORT_INTERVAL_KEY, REPORT_INTERVAL);
+
+    job.setMapRunnerClass(AutoProgressMapRunner.class);
+    job.setMapperClass(SleepingMapper.class);
+
+    job.setNumReduceTasks(0);
+    job.setNumMapTasks(1);
+
+    FileInputFormat.addInputPath(job, inPath);
+    FileOutputFormat.setOutputPath(job, outPath);
+
+    RunningJob runningJob = JobClient.runJob(job);
+    runningJob.waitForCompletion();
+
+    assertEquals("Sleep job failed!", JobStatus.SUCCEEDED, runningJob.getJobState());
+  }
+
+  /** Test that if the mapper bails early, we shut down the progress thread
+      in a timely fashion.
+    */
+  public void testEarlyExit() throws IOException {
+    JobConf job = mr.createJobConf();
+
+    final int REPORT_INTERVAL = 30000;
+
+    job.setInt(AutoProgressMapRunner.SLEEP_INTERVAL_KEY, REPORT_INTERVAL);
+    job.setInt(AutoProgressMapRunner.REPORT_INTERVAL_KEY, REPORT_INTERVAL);
+
+    job.setNumReduceTasks(0);
+    job.setNumMapTasks(1);
+
+    job.setInt("mapred.map.max.attempts", 1);
+
+    job.setMapRunnerClass(AutoProgressMapRunner.class);
+    job.setMapperClass(FailingMapper.class);
+
+    FileInputFormat.addInputPath(job, inPath);
+    FileOutputFormat.setOutputPath(job, outPath);
+
+    RunningJob runningJob = null;
+    long startTime = System.currentTimeMillis();
+    try {
+      runningJob = JobClient.runJob(job);
+      runningJob.waitForCompletion();
+      assertEquals("Failing job succeded!", JobStatus.FAILED, runningJob.getJobState());
+    } catch(IOException ioe) {
+      // Expected
+    }
+
+    long endTime = System.currentTimeMillis();
+    long duration = endTime - startTime;
+
+    assertTrue("Job took too long to clean up (" + duration + ")",
+        duration < (REPORT_INTERVAL * 2));
+  }
+}



Mime
View raw message