hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r816240 - in /hadoop/mapreduce/trunk: ./ src/contrib/sqoop/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/ src/co...
Date Thu, 17 Sep 2009 15:37:22 GMT
Author: tomwhite
Date: Thu Sep 17 15:37:22 2009
New Revision: 816240

URL: http://svn.apache.org/viewvc?rev=816240&view=rev
Log:
MAPREDUCE-907. Sqoop should use more intelligent splits. Contributed by Aaron Kimball.

Added:
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/RawKeyTextOutputFormat.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestMultiMaps.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestSplitBy.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapred/MapredTests.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapreduce/
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapreduce/MapreduceTests.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java
Removed:
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestOrderBy.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml
    hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestAllTables.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestWhere.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Sep 17 15:37:22 2009
@@ -355,6 +355,9 @@
     MAPREDUCE-284. Enables ipc.client.tcpnodelay in Tasktracker's Child.
     (Ravi Gummadi via sharad)
 
+    MAPREDUCE-907. Sqoop should use more intelligent splits. (Aaron Kimball
+    via tomwhite)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml Thu Sep 17 15:37:22 2009
@@ -26,6 +26,38 @@
   <import file="../build-contrib.xml"/>
   <property environment="env"/>
   <property name="sqoop.thirdparty.lib.dir" value="" />
+  <property name="mrunit.class.dir" value="${build.dir}/../mrunit/classes" />
+
+  <!-- ================================================================== -->
+  <!-- Compile test code                                                  -->
+  <!-- Override with our own version so we can enforce build dependencies -->
+  <!-- on compile-mapred-test for MiniMRCluster, and MRUnit.              -->
+  <!-- ================================================================== -->
+  <target name="compile-test" depends="compile-examples" if="test.available">
+    <echo message="Compiling ${name} dependencies" />
+    <!-- need top-level compile-mapred-test for MiniMRCluster -->
+    <subant target="compile-mapred-test">
+      <fileset dir="../../.." includes="build.xml" />
+    </subant>
+
+    <!-- Need MRUnit compiled for some tests -->
+    <subant target="compile">
+      <fileset dir="../mrunit" includes="build.xml" />
+    </subant>
+
+    <echo message="contrib: ${name}"/>
+    <javac
+     encoding="${build.encoding}"
+     srcdir="${src.test}"
+     includes="**/*.java"
+     destdir="${build.test}"
+     debug="${javac.debug}">
+    <classpath>
+      <path refid="test.classpath"/>
+      <pathelement path="${mrunit.class.dir}" />
+    </classpath>
+    </javac>
+  </target>
 
   <!-- ================================================================== -->
   <!-- Run unit tests                                                     -->
@@ -54,6 +86,11 @@
       <sysproperty key="build.test" value="${build.test}"/>
       <sysproperty key="contrib.name" value="${name}"/>
 
+
+      <!-- define this property to force Sqoop to throw better exceptions on errors
+           during testing, instead of printing a short message and exiting with status 1. -->
+      <sysproperty key="sqoop.throwOnError" value="" />
+
       <!--
            Added property needed to use the .class files for compilation
            instead of depending on hadoop-*-core.jar
@@ -92,13 +129,16 @@
       -->
       <sysproperty key="hive.home" value="${basedir}/testdata/hive" />
 
-      <!-- tools.jar from Sun JDK also required to invoke javac. -->
       <classpath>
         <path refid="test.classpath"/>
         <path refid="contrib-classpath"/>
+        <!-- tools.jar from Sun JDK also required to invoke javac. -->
         <pathelement path="${env.JAVA_HOME}/lib/tools.jar" />
+        <!-- need thirdparty JDBC drivers for thirdparty tests -->
         <fileset dir="${sqoop.thirdparty.lib.dir}"
             includes="*.jar" />
+        <!-- need MRUnit for some tests -->
+        <pathelement path="${mrunit.class.dir}" />
       </classpath>
       <formatter type="${test.junit.output.format}" />
       <batchtest todir="${build.test}" unless="testcase">

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml Thu Sep 17 15:37:22 2009
@@ -64,6 +64,18 @@
       name="hsqldb"
       rev="${hsqldb.version}"
       conf="common->default"/>
+    <dependency org="javax.servlet"
+      name="servlet-api"
+      rev="${servlet-api.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty"
+      rev="${jetty.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty-util"
+      rev="${jetty-util.version}"
+      conf="common->master"/>
     <dependency org="org.apache.hadoop"
       name="avro"
       rev="1.0.0"

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java Thu Sep 17 15:37:22 2009
@@ -87,7 +87,7 @@
   private String jarOutputDir;
   private ControlAction action;
   private String hadoopHome;
-  private String orderByCol;
+  private String splitByCol;
   private String whereClause;
   private String debugSqlCmd;
   private String driverClassName;
@@ -99,6 +99,7 @@
   private boolean hiveImport;
   private String packageName; // package to prepend to auto-named classes.
   private String className; // package+class to apply to individual table import.
+  private int numMappers;
 
   private char inputFieldDelim;
   private char inputRecordDelim;
@@ -114,6 +115,8 @@
 
   private boolean areDelimsManuallySet;
 
+  public static final int DEFAULT_NUM_MAPPERS = 4;
+
   private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
 
   public ImportOptions() {
@@ -153,7 +156,7 @@
       this.password = props.getProperty("db.password", this.password);
       this.tableName = props.getProperty("db.table", this.tableName);
       this.connectString = props.getProperty("db.connect.url", this.connectString);
-      this.orderByCol = props.getProperty("db.sort.column", this.orderByCol);
+      this.splitByCol = props.getProperty("db.split.column", this.splitByCol);
       this.whereClause = props.getProperty("db.where.clause", this.whereClause);
       this.driverClassName = props.getProperty("jdbc.driver", this.driverClassName);
       this.warehouseDir = props.getProperty("hdfs.warehouse.dir", this.warehouseDir);
@@ -227,6 +230,8 @@
 
     this.areDelimsManuallySet = false;
 
+    this.numMappers = DEFAULT_NUM_MAPPERS;
+
     loadFromProperties();
   }
 
@@ -255,7 +260,7 @@
     System.out.println("Import control options:");
     System.out.println("--table (tablename)          Table to read");
     System.out.println("--columns (col,col,col...)   Columns to export from table");
-    System.out.println("--order-by (column-name)     Column of the table used to order results");
+    System.out.println("--split-by (column-name)     Column of the table used to split work units");
     System.out.println("--where (where clause)       Where clause to use during export");
     System.out.println("--hadoop-home (dir)          Override $HADOOP_HOME");
     System.out.println("--hive-home (dir)            Override $HIVE_HOME");
@@ -263,9 +268,10 @@
     System.out.println("--as-sequencefile            Imports data to SequenceFiles");
     System.out.println("--as-textfile                Imports data as plain text (default)");
     System.out.println("--all-tables                 Import all tables in database");
-    System.out.println("                             (Ignores --table, --columns and --order-by)");
+    System.out.println("                             (Ignores --table, --columns and --split-by)");
     System.out.println("--hive-import                If set, then import the table into Hive.");
     System.out.println("                    (Uses Hive's default delimiters if none are set.)");
+    System.out.println("-m, --num-mappers (n)        Use 'n' map tasks to import in parallel");
     System.out.println("");
     System.out.println("Output line formatting options:");
     System.out.println("--fields-terminated-by (char)    Sets the field separator character");
@@ -409,8 +415,8 @@
         } else if (args[i].equals("--columns")) {
           String columnString = args[++i];
           this.columns = columnString.split(",");
-        } else if (args[i].equals("--order-by")) {
-          this.orderByCol = args[++i];
+        } else if (args[i].equals("--split-by")) {
+          this.splitByCol = args[++i];
         } else if (args[i].equals("--where")) {
           this.whereClause = args[++i];
         } else if (args[i].equals("--list-tables")) {
@@ -441,6 +447,14 @@
           this.hiveHome = args[++i];
         } else if (args[i].equals("--hive-import")) {
           this.hiveImport = true;
+        } else if (args[i].equals("--num-mappers") || args[i].equals("-m")) {
+          String numMappersStr = args[++i];
+          try {
+            this.numMappers = Integer.valueOf(numMappersStr);
+          } catch (NumberFormatException nfe) {
+            throw new InvalidOptionsException("Invalid argument; expected "
+                + args[i - 1] + " (number).");
+          }
         } else if (args[i].equals("--fields-terminated-by")) {
           this.outputFieldDelim = ImportOptions.toChar(args[++i]);
           this.areDelimsManuallySet = true;
@@ -530,9 +544,9 @@
       // If we're reading all tables in a database, can't filter column names.
       throw new InvalidOptionsException("--columns and --all-tables are incompatible options."
           + HELP_STR);
-    } else if (this.allTables && this.orderByCol != null) {
+    } else if (this.allTables && this.splitByCol != null) {
       // If we're reading all tables in a database, can't set pkey
-      throw new InvalidOptionsException("--order-by and --all-tables are incompatible options."
+      throw new InvalidOptionsException("--split-by and --all-tables are incompatible options."
           + HELP_STR);
     } else if (this.allTables && this.className != null) {
       // If we're reading all tables, can't set individual class name
@@ -544,6 +558,10 @@
     } else if (this.className != null && this.packageName != null) {
       throw new InvalidOptionsException(
           "--class-name overrides --package-name. You cannot use both." + HELP_STR);
+    } else if (this.action == ControlAction.FullImport && !this.allTables
+        && this.tableName == null) {
+      throw new InvalidOptionsException(
+          "One of --table or --all-tables is required for import." + HELP_STR);
     }
 
     if (this.hiveImport) {
@@ -594,8 +612,8 @@
     }
   }
 
-  public String getOrderByCol() {
-    return orderByCol;
+  public String getSplitByCol() {
+    return splitByCol;
   }
   
   public String getWhereClause() {
@@ -623,6 +641,13 @@
   }
 
   /**
+   * @return the number of map tasks to use for import
+   */
+  public int getNumMappers() {
+    return this.numMappers;
+  }
+
+  /**
    * @return the user-specified absolute class name for the table
    */
   public String getClassName() {

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java Thu Sep 17 15:37:22 2009
@@ -42,6 +42,11 @@
 
   public static final Log LOG = LogFactory.getLog(Sqoop.class.getName());
 
+  /** If this System property is set, always throw an exception, do not just
+      exit with status 1.
+    */
+  public static final String SQOOP_RETHROW_PROPERTY = "sqoop.throwOnError";
+
   static {
     Configuration.addDefaultResource("sqoop-default.xml");
     Configuration.addDefaultResource("sqoop-site.xml");
@@ -112,7 +117,11 @@
       manager = new ConnFactory(getConf()).getManager(options);
     } catch (Exception e) {
       LOG.error("Got error creating database manager: " + e.toString());
-      return 1;
+      if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
+        throw new RuntimeException(e);
+      } else {
+        return 1;
+      }
     }
 
     if (options.doHiveImport()) {
@@ -167,10 +176,18 @@
         }
       } catch (IOException ioe) {
         LOG.error("Encountered IOException running import job: " + ioe.toString());
-        return 1;
+        if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
+          throw new RuntimeException(ioe);
+        } else {
+          return 1;
+        }
       } catch (ImportError ie) {
         LOG.error("Error during import: " + ie.toString());
-        return 1;
+        if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
+          throw new RuntimeException(ie);
+        } else {
+          return 1;
+        }
       }
     }
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java Thu Sep 17 15:37:22 2009
@@ -30,6 +30,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.mapred.ImportJob;
 import org.apache.hadoop.sqoop.util.ImportError;
 
 /**
@@ -84,5 +85,27 @@
 
     return connection;
   }
+
+  /**
+   * This importTable() implementation continues to use the older DBInputFormat
+   * because DataDrivenDBInputFormat does not currently work with Oracle.
+   */
+  public void importTable(String tableName, String jarFile, Configuration conf)
+      throws IOException, ImportError {
+    ImportJob importer = new ImportJob(options);
+    String splitCol = options.getSplitByCol();
+    if (null == splitCol) {
+      // If the user didn't specify a splitting column, try to infer one.
+      splitCol = getPrimaryKey(tableName);
+    }
+
+    if (null == splitCol) {
+      // Can't infer a primary key.
+      throw new ImportError("No primary key could be found for table " + tableName
+          + ". Please specify one with --split-by.");
+    }
+
+    importer.runImport(tableName, jarFile, splitCol, conf);
+  }
 }
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java Thu Sep 17 15:37:22 2009
@@ -19,7 +19,7 @@
 package org.apache.hadoop.sqoop.manager;
 
 import org.apache.hadoop.sqoop.ImportOptions;
-import org.apache.hadoop.sqoop.mapred.ImportJob;
+import org.apache.hadoop.sqoop.mapreduce.DataDrivenImportJob;
 import org.apache.hadoop.sqoop.util.ImportError;
 import org.apache.hadoop.sqoop.util.ResultSetPrinter;
 
@@ -258,24 +258,24 @@
 
   /**
    * Default implementation of importTable() is to launch a MapReduce job
-   * via ImportJob to read the table with DBInputFormat.
+   * via DataDrivenImportJob to read the table with DataDrivenDBInputFormat.
    */
   public void importTable(String tableName, String jarFile, Configuration conf)
       throws IOException, ImportError {
-    ImportJob importer = new ImportJob(options);
-    String orderCol = options.getOrderByCol();
-    if (null == orderCol) {
-      // If the user didn't specify an ordering column, try to infer one.
-      orderCol = getPrimaryKey(tableName);
+    DataDrivenImportJob importer = new DataDrivenImportJob(options);
+    String splitCol = options.getSplitByCol();
+    if (null == splitCol) {
+      // If the user didn't specify a splitting column, try to infer one.
+      splitCol = getPrimaryKey(tableName);
     }
 
-    if (null == orderCol) {
+    if (null == splitCol) {
       // Can't infer a primary key.
       throw new ImportError("No primary key could be found for table " + tableName
-          + ". Please specify one with --order-by.");
+          + ". Please specify one with --split-by.");
     }
 
-    importer.runImport(tableName, jarFile, orderCol, conf);
+    importer.runImport(tableName, jarFile, splitCol, conf);
   }
 
   /**

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java?rev=816240&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/AutoProgressMapper.java Thu Sep 17 15:37:22 2009
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+
+/**
+ * Identity mapper that continuously reports progress via a background thread.
+ */
+public class AutoProgressMapper<KEYIN, VALIN, KEYOUT, VALOUT>
+    extends Mapper<KEYIN, VALIN, KEYOUT, VALOUT> {
+
+  public static final Log LOG = LogFactory.getLog(AutoProgressMapper.class.getName());
+
+  /** Total number of millis for which progress will be reported
+      by the auto-progress thread. If this is zero, then the auto-progress
+      thread will never voluntarily exit.
+    */
+  private int maxProgressPeriod;
+
+  /** Number of milliseconds to sleep for between loop iterations. Must be less
+      than report interval.
+    */
+  private int sleepInterval;
+
+  /** Number of milliseconds between calls to Reporter.progress(). Should be a multiple
+      of the sleepInterval.
+    */
+  private int reportInterval;
+
+  public static final String MAX_PROGRESS_PERIOD_KEY = "sqoop.mapred.auto.progress.max";
+  public static final String SLEEP_INTERVAL_KEY = "sqoop.mapred.auto.progress.sleep";
+  public static final String REPORT_INTERVAL_KEY = "sqoop.mapred.auto.progress.report";
+
+  // Sleep for 10 seconds at a time.
+  static final int DEFAULT_SLEEP_INTERVAL = 10000;
+
+  // Report progress every 30 seconds.
+  static final int DEFAULT_REPORT_INTERVAL = 30000;
+
+  // Disable max progress, by default.
+  static final int DEFAULT_MAX_PROGRESS = 0;
+
+  private class ProgressThread extends Thread {
+
+    private volatile boolean keepGoing; // while this is true, thread runs.
+    private Context context;
+    private long startTimeMillis;
+    private long lastReportMillis;
+
+    public ProgressThread(final Context ctxt) {
+      this.context = ctxt;
+      this.keepGoing = true;
+    }
+
+    public void signalShutdown() {
+      this.keepGoing = false; // volatile update.
+      this.interrupt();
+    }
+
+    public void run() {
+      this.lastReportMillis = System.currentTimeMillis();
+      this.startTimeMillis = this.lastReportMillis;
+
+      final long MAX_PROGRESS = AutoProgressMapper.this.maxProgressPeriod;
+      final long REPORT_INTERVAL = AutoProgressMapper.this.reportInterval;
+      final long SLEEP_INTERVAL = AutoProgressMapper.this.sleepInterval;
+
+      // in a loop:
+      //   * Check that we haven't run for too long (maxProgressPeriod)
+      //   * If it's been a report interval since we last made progress, make more.
+      //   * Sleep for a bit.
+      //   * If the parent thread has signaled for exit, do so.
+      while (this.keepGoing) {
+        long curTimeMillis = System.currentTimeMillis();
+
+        if (MAX_PROGRESS != 0 && curTimeMillis - this.startTimeMillis > MAX_PROGRESS) {
+          this.keepGoing = false;
+          LOG.info("Auto-progress thread exiting after " + MAX_PROGRESS + " ms.");
+          break;
+        }
+
+        if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) {
+          // It's been a full report interval -- claim progress.
+          LOG.debug("Auto-progress thread reporting progress");
+          this.context.progress();
+          this.lastReportMillis = curTimeMillis;
+        }
+
+        // Unless we got an interrupt while we were working,
+        // sleep a bit before doing more work.
+        if (!this.interrupted()) {
+          try {
+            Thread.sleep(SLEEP_INTERVAL);
+          } catch (InterruptedException ie) {
+            // we were notified on something; not necessarily an error.
+          }
+        }
+      }
+
+      LOG.info("Auto-progress thread is finished. keepGoing=" + this.keepGoing);
+    }
+  }
+
+  /**
+   * Set configuration parameters for the auto-progress thread.
+   */
+  private final void configureAutoProgress(Configuration job) {
+    this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY, DEFAULT_MAX_PROGRESS);
+    this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY, DEFAULT_SLEEP_INTERVAL);
+    this.reportInterval = job.getInt(REPORT_INTERVAL_KEY, DEFAULT_REPORT_INTERVAL);
+
+    if (this.reportInterval < 1) {
+      LOG.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to " + DEFAULT_REPORT_INTERVAL);
+      this.reportInterval = DEFAULT_REPORT_INTERVAL;
+    }
+
+    if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) {
+      LOG.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to " + DEFAULT_SLEEP_INTERVAL);
+      this.sleepInterval = DEFAULT_SLEEP_INTERVAL;
+    }
+
+    if (this.maxProgressPeriod < 0) {
+      LOG.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to " + DEFAULT_MAX_PROGRESS);
+      this.maxProgressPeriod = DEFAULT_MAX_PROGRESS;
+    }
+  }
+
+
+  // map() method intentionally omitted; Mapper.map() is the identity mapper.
+
+
+  /**
+   * Run the mapping process for this task, wrapped in an auto-progress system.
+   */
+  public void run(Context context) throws IOException, InterruptedException {
+    configureAutoProgress(context.getConfiguration());
+    ProgressThread thread = this.new ProgressThread(context);
+
+    try {
+      thread.setDaemon(true);
+      thread.start();
+
+      // use default run() method to actually drive the mapping.
+      super.run(context);
+    } finally {
+      // Tell the progress thread to exit..
+      LOG.debug("Instructing auto-progress thread to quit.");
+      thread.signalShutdown();
+      try {
+        // And wait for that to happen.
+        LOG.debug("Waiting for progress thread shutdown...");
+        thread.join();
+        LOG.debug("Progress thread shutdown detected.");
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted when waiting on auto-progress thread: " + ie.toString());
+      }
+    }
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java?rev=816240&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java Thu Sep 17 15:37:22 2009
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
+import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+import org.apache.hadoop.sqoop.ConnFactory;
+import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.orm.TableClassName;
+import org.apache.hadoop.sqoop.util.ClassLoaderStack;
+import org.apache.hadoop.sqoop.util.PerfCounters;
+
+/**
+ * Actually runs a jdbc import job using the ORM files generated by the sqoop.orm package.
+ * Uses DataDrivenDBInputFormat
+ */
+public class DataDrivenImportJob {
+
+  public static final Log LOG = LogFactory.getLog(DataDrivenImportJob.class.getName());
+
+  private ImportOptions options;
+
+  public DataDrivenImportJob(final ImportOptions opts) {
+    this.options = opts;
+  }
+
+  /**
+   * Run an import job to read a table in to HDFS
+   *
+   * @param tableName  the database table to read
+   * @param ormJarFile the Jar file to insert into the dcache classpath. (may be null)
+   * @param splitByCol the column of the database table to use to split the import
+   * @param conf A fresh Hadoop Configuration to use to build an MR job.
+   */
+  public void runImport(String tableName, String ormJarFile, String splitByCol,
+      Configuration conf) throws IOException {
+
+    LOG.info("Beginning data-driven import of " + tableName);
+
+    String tableClassName = new TableClassName(options).getClassForTable(tableName);
+
+    boolean isLocal = "local".equals(conf.get("mapred.job.tracker"));
+    ClassLoader prevClassLoader = null;
+    if (isLocal) {
+      // If we're using the LocalJobRunner, then instead of using the compiled jar file
+      // as the job source, we're running in the current thread. Push on another classloader
+      // that loads from that jar in addition to everything currently on the classpath.
+      prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile, tableClassName);
+    }
+
+    try {
+      Job job = new Job(conf);
+
+      // Set the external jar to use for the job.
+      job.getConfiguration().set("mapred.jar", ormJarFile);
+
+      String hdfsWarehouseDir = options.getWarehouseDir();
+      Path outputPath;
+
+      if (null != hdfsWarehouseDir) {
+        Path hdfsWarehousePath = new Path(hdfsWarehouseDir);
+        hdfsWarehousePath.makeQualified(FileSystem.get(job.getConfiguration()));
+        outputPath = new Path(hdfsWarehousePath, tableName);
+      } else {
+        outputPath = new Path(tableName);
+      }
+
+      if (options.getFileLayout() == ImportOptions.FileLayout.TextFile) {
+        job.setOutputFormatClass(RawKeyTextOutputFormat.class);
+        job.setMapperClass(TextImportMapper.class);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(NullWritable.class);
+      } else if (options.getFileLayout() == ImportOptions.FileLayout.SequenceFile) {
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        job.setMapperClass(AutoProgressMapper.class);
+        SequenceFileOutputFormat.setCompressOutput(job, true);
+        SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+        job.getConfiguration().set("mapred.output.value.class", tableClassName);
+      } else {
+        LOG.warn("Unknown file layout specified: " + options.getFileLayout() + "; using text.");
+      }
+
+      int numMapTasks = options.getNumMappers();
+      if (numMapTasks < 1) {
+        numMapTasks = ImportOptions.DEFAULT_NUM_MAPPERS;
+        LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
+      }
+      job.getConfiguration().setInt("mapred.map.tasks", numMapTasks);
+      job.setNumReduceTasks(0);
+
+      job.setInputFormatClass(DataDrivenDBInputFormat.class);
+
+      FileOutputFormat.setOutputPath(job, outputPath);
+
+      ConnManager mgr = new ConnFactory(conf).getManager(options);
+      String username = options.getUsername();
+      if (null == username || username.length() == 0) {
+        DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
+            options.getConnectString());
+      } else {
+        DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
+            options.getConnectString(), username, options.getPassword());
+      }
+
+      String [] colNames = options.getColumns();
+      if (null == colNames) {
+        colNames = mgr.getColumnNames(tableName);
+      }
+
+      // It's ok if the where clause is null in DBInputFormat.setInput.
+      String whereClause = options.getWhereClause();
+
+      // We can't set the class properly in here, because we may not have the
+      // jar loaded in this JVM. So we start by calling setInput() with DBWritable,
+      // and then overriding the string manually.
+      DataDrivenDBInputFormat.setInput(job, DBWritable.class, tableName, whereClause,
+          splitByCol, colNames);
+      job.getConfiguration().set(DBConfiguration.INPUT_CLASS_PROPERTY, tableClassName);
+
+      PerfCounters counters = new PerfCounters();
+      counters.startClock();
+
+      try {
+        job.waitForCompletion(false);
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException(cnfe);
+      }
+
+      counters.stopClock();
+      counters.addBytes(job.getCounters().getGroup("FileSystemCounters")
+          .findCounter("HDFS_BYTES_WRITTEN").getValue());
+      LOG.info("Transferred " + counters.toString());
+    } finally {
+      if (isLocal && null != prevClassLoader) {
+        // unload the special classloader for this jar.
+        ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/RawKeyTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/RawKeyTextOutputFormat.java?rev=816240&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/RawKeyTextOutputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/RawKeyTextOutputFormat.java Thu Sep 17 15:37:22 2009
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapreduce;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.*;
+
+/** An {@link OutputFormat} that writes plain text files.
+ * Only writes the key. Does not write any delimiter/newline after the key.
+ */
+public class RawKeyTextOutputFormat<K, V> extends FileOutputFormat<K, V> {
+
+  protected static class RawKeyRecordWriter<K, V> extends RecordWriter<K, V> {
+    private static final String utf8 = "UTF-8";
+
+    protected DataOutputStream out;
+
+    public RawKeyRecordWriter(DataOutputStream out) {
+      this.out = out;
+    }
+
+    /**
+     * Write the object to the byte stream, handling Text as a special
+     * case.
+     * @param o the object to print
+     * @throws IOException if the write throws, we pass it on
+     */
+    private void writeObject(Object o) throws IOException {
+      if (o instanceof Text) {
+        Text to = (Text) o;
+        out.write(to.getBytes(), 0, to.getLength());
+      } else {
+        out.write(o.toString().getBytes(utf8));
+      }
+    }
+
+    public synchronized void write(K key, V value) throws IOException {
+      writeObject(key);
+    }
+
+    public synchronized void close(TaskAttemptContext context) throws IOException {
+      out.close();
+    }
+  }
+
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    boolean isCompressed = getCompressOutput(context);
+    Configuration conf = context.getConfiguration();
+    String ext = "";
+    CompressionCodec codec = null;
+
+    if (isCompressed) {
+      // create the named codec
+      Class<? extends CompressionCodec> codecClass =
+        getOutputCompressorClass(context, GzipCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, conf);
+
+      ext = codec.getDefaultExtension();
+    }
+
+    Path file = getDefaultWorkFile(context, ext);
+    FileSystem fs = file.getFileSystem(conf);
+    FSDataOutputStream fileOut = fs.create(file, false);
+    DataOutputStream ostream = fileOut;
+
+    if (isCompressed) {
+      ostream = new DataOutputStream(codec.createOutputStream(fileOut));
+    }
+
+    return new RawKeyRecordWriter<K, V>(ostream);
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java?rev=816240&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/TextImportMapper.java Thu Sep 17 15:37:22 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+/**
+ * Converts an input record into a string representation and emit it.
+ */
+public class TextImportMapper
+    extends AutoProgressMapper<LongWritable, DBWritable, Text, NullWritable> {
+
+  private Text outkey;
+
+  public TextImportMapper() {
+    outkey = new Text();
+  }
+
+  public void map(LongWritable key, DBWritable val, Context context)
+      throws IOException, InterruptedException {
+    outkey.set(val.toString());
+    context.write(outkey, NullWritable.get());
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java Thu Sep 17 15:37:22 2009
@@ -23,7 +23,8 @@
 import org.apache.hadoop.sqoop.lib.TestRecordParser;
 import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
 import org.apache.hadoop.sqoop.manager.TestSqlManager;
-import org.apache.hadoop.sqoop.mapred.TestAutoProgressMapRunner;
+import org.apache.hadoop.sqoop.mapred.MapredTests;
+import org.apache.hadoop.sqoop.mapreduce.MapreduceTests;
 import org.apache.hadoop.sqoop.orm.TestClassWriter;
 import org.apache.hadoop.sqoop.orm.TestParseMethods;
 
@@ -32,24 +33,22 @@
 
 /**
  * All tests for Sqoop (org.apache.hadoop.sqoop)
- *
- * 
  */
-public final class AllTests  {
+public final class AllTests {
 
   private AllTests() { }
 
   public static Test suite() {
     TestSuite suite = new TestSuite("Tests for org.apache.hadoop.sqoop");
 
-    suite.addTestSuite(TestAutoProgressMapRunner.class);
     suite.addTestSuite(TestAllTables.class);
     suite.addTestSuite(TestHsqldbManager.class);
     suite.addTestSuite(TestSqlManager.class);
     suite.addTestSuite(TestClassWriter.class);
     suite.addTestSuite(TestColumnTypes.class);
     suite.addTestSuite(TestMultiCols.class);
-    suite.addTestSuite(TestOrderBy.class);
+    suite.addTestSuite(TestMultiMaps.class);
+    suite.addTestSuite(TestSplitBy.class);
     suite.addTestSuite(TestWhere.class);
     suite.addTestSuite(TestHiveImport.class);
     suite.addTestSuite(TestRecordParser.class);
@@ -58,6 +57,8 @@
     suite.addTestSuite(TestParseMethods.class);
     suite.addTestSuite(TestConnFactory.class);
     suite.addTest(ThirdPartyTests.suite());
+    suite.addTest(MapredTests.suite());
+    suite.addTest(MapreduceTests.suite());
 
     return suite;
   }

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestAllTables.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestAllTables.java?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestAllTables.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestAllTables.java Thu Sep 17 15:37:22 2009
@@ -36,9 +36,6 @@
 
 /**
  * Test the --all-tables functionality that can import multiple tables.
- * ;
- * 
- *
  */
 public class TestAllTables extends ImportJobTestCase {
 
@@ -63,6 +60,8 @@
     args.add(getWarehouseDir());
     args.add("--connect");
     args.add(HsqldbTestServer.getUrl());
+    args.add("--num-mappers");
+    args.add("1");
 
     return args.toArray(new String[0]);
   }
@@ -107,7 +106,7 @@
     Path warehousePath = new Path(this.getWarehouseDir());
     for (String tableName : this.tableNames) {
       Path tablePath = new Path(warehousePath, tableName);
-      Path filePath = new Path(tablePath, "part-00000");
+      Path filePath = new Path(tablePath, "part-m-00000");
 
       // dequeue the expected value for this table. This
       // list has the same order as the tableNames list.

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java Thu Sep 17 15:37:22 2009
@@ -36,9 +36,6 @@
  *    write(DataOutput), readFields(DataInput)
  * - And optionally, that we can push to the database:
  *    write(PreparedStatement)
- *
- * 
- *
  */
 public class TestColumnTypes extends ImportJobTestCase {
 
@@ -217,14 +214,24 @@
 
   @Test
   public void testTimestamp2() {
+    try {
+    LOG.debug("Beginning testTimestamp2");
     verifyType("TIMESTAMP", "'2009-04-24 18:24:00.0002'",
         "2009-04-24 18:24:00.000200000",
         "2009-04-24 18:24:00.0002");
+    } finally {
+      LOG.debug("End testTimestamp2");
+    }
   }
 
   @Test
   public void testTimestamp3() {
+    try {
+    LOG.debug("Beginning testTimestamp3");
     verifyType("TIMESTAMP", "null", null);
+    } finally {
+      LOG.debug("End testTimestamp3");
+    }
   }
 
   @Test

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestMultiMaps.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestMultiMaps.java?rev=816240&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestMultiMaps.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestMultiMaps.java Thu Sep 17 15:37:22 2009
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.hadoop.sqoop.ImportOptions.InvalidOptionsException;
+import org.apache.hadoop.sqoop.orm.CompilationManager;
+import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
+import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
+import org.apache.hadoop.sqoop.testutil.SeqFileReader;
+import org.apache.hadoop.sqoop.util.ClassLoaderStack;
+
+/**
+ * Test that using multiple mapper splits works.
+ */
+public class TestMultiMaps extends ImportJobTestCase {
+
+  /**
+   * Create the argv to pass to Sqoop
+   * @return the argv as an array of strings.
+   */
+  private String [] getArgv(boolean includeHadoopFlags, String [] colNames, String splitByCol) {
+    String columnsString = "";
+    for (String col : colNames) {
+      columnsString += col + ",";
+    }
+
+    ArrayList<String> args = new ArrayList<String>();
+
+    if (includeHadoopFlags) {
+      args.add("-D");
+      args.add("mapred.job.tracker=local");
+      args.add("-D");
+      args.add("fs.default.name=file:///");
+    }
+
+    args.add("--table");
+    args.add(HsqldbTestServer.getTableName());
+    args.add("--columns");
+    args.add(columnsString);
+    args.add("--split-by");
+    args.add(splitByCol);
+    args.add("--warehouse-dir");
+    args.add(getWarehouseDir());
+    args.add("--connect");
+    args.add(HsqldbTestServer.getUrl());
+    args.add("--as-sequencefile");
+    args.add("--num-mappers");
+    args.add("2");
+
+    return args.toArray(new String[0]);
+  }
+
+  // this test just uses the two int table.
+  protected String getTableName() {
+    return HsqldbTestServer.getTableName();
+  }
+
+  /** @return a list of Path objects for each data file */
+  protected List<Path> getDataFilePaths() throws IOException {
+    List<Path> paths = new ArrayList<Path>();
+    Configuration conf = new Configuration();
+    conf.set("fs.default.name", "file:///");
+    FileSystem fs = FileSystem.get(conf);
+
+    FileStatus [] stats = fs.listStatus(getTablePath());
+    for (FileStatus stat : stats) {
+      paths.add(stat.getPath());
+    }
+
+    return paths;
+  }
+
+  /**
+   * Given a comma-delimited list of integers, grab and parse the first int
+   * @param str a comma-delimited list of values, the first of which is an int.
+   * @return the first field in the string, cast to int
+   */
+  private int getFirstInt(String str) {
+    String [] parts = str.split(",");
+    return Integer.parseInt(parts[0]);
+  }
+
+  public void runMultiMapTest(String splitByCol, int expectedSum)
+      throws IOException {
+
+    String [] columns = HsqldbTestServer.getFieldNames();
+    ClassLoader prevClassLoader = null;
+    SequenceFile.Reader reader = null;
+
+    String [] argv = getArgv(true, columns, splitByCol);
+    runImport(argv);
+    try {
+      ImportOptions opts = new ImportOptions();
+      opts.parse(getArgv(false, columns, splitByCol));
+
+      CompilationManager compileMgr = new CompilationManager(opts);
+      String jarFileName = compileMgr.getJarFilename();
+
+      prevClassLoader = ClassLoaderStack.addJarFile(jarFileName, getTableName());
+
+      List<Path> paths = getDataFilePaths();
+      Configuration conf = new Configuration();
+      int curSum = 0;
+
+      assertTrue("Found only " + paths.size() + " path(s); expected > 1.", paths.size() > 1);
+
+      // We expect multiple files. We need to open all the files and sum up the
+      // first column across all of them.
+      for (Path p : paths) {
+        reader = SeqFileReader.getSeqFileReader(p.toString());
+
+        // here we can actually instantiate (k, v) pairs.
+        Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+        Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+        // We know that these values are two ints separated by a ',' character.
+        // Since this is all dynamic, though, we don't want to actually link against
+        // the class and use its methods. So we just parse this back into int fields manually.
+        // Sum them up and ensure that we get the expected total for the first column, to
+        // verify that we got all the results from the db into the file.
+
+        // now sum up everything in the file.
+        while (reader.next(key) != null) {
+          reader.getCurrentValue(val);
+          curSum += getFirstInt(val.toString());
+        }
+
+        IOUtils.closeStream(reader);
+        reader = null;
+      }
+
+      assertEquals("Total sum of first db column mismatch", expectedSum, curSum);
+    } catch (InvalidOptionsException ioe) {
+      fail(ioe.toString());
+    } finally {
+      IOUtils.closeStream(reader);
+
+      if (null != prevClassLoader) {
+        ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
+      }
+    }
+  }
+
+  public void testSplitByFirstCol() throws IOException {
+    runMultiMapTest("INTFIELD1", HsqldbTestServer.getFirstColSum());
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestSplitBy.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestSplitBy.java?rev=816240&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestSplitBy.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestSplitBy.java Thu Sep 17 15:37:22 2009
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.hadoop.sqoop.ImportOptions.InvalidOptionsException;
+import org.apache.hadoop.sqoop.orm.CompilationManager;
+import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
+import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
+import org.apache.hadoop.sqoop.testutil.SeqFileReader;
+import org.apache.hadoop.sqoop.util.ClassLoaderStack;
+
+/**
+ * Test that --split-by works
+ */
+public class TestSplitBy extends ImportJobTestCase {
+
+  /**
+   * Create the argv to pass to Sqoop
+   * @return the argv as an array of strings.
+   */
+  private String [] getArgv(boolean includeHadoopFlags, String [] colNames, String splitByCol) {
+    String columnsString = "";
+    for (String col : colNames) {
+      columnsString += col + ",";
+    }
+
+    ArrayList<String> args = new ArrayList<String>();
+
+    if (includeHadoopFlags) {
+      args.add("-D");
+      args.add("mapred.job.tracker=local");
+      args.add("-D");
+      args.add("mapred.map.tasks=1");
+      args.add("-D");
+      args.add("fs.default.name=file:///");
+    }
+
+    args.add("--table");
+    args.add(HsqldbTestServer.getTableName());
+    args.add("--columns");
+    args.add(columnsString);
+    args.add("--split-by");
+    args.add(splitByCol);
+    args.add("--warehouse-dir");
+    args.add(getWarehouseDir());
+    args.add("--connect");
+    args.add(HsqldbTestServer.getUrl());
+    args.add("--as-sequencefile");
+    args.add("--num-mappers");
+    args.add("1");
+
+    return args.toArray(new String[0]);
+  }
+
+  // this test just uses the two int table.
+  protected String getTableName() {
+    return HsqldbTestServer.getTableName();
+  }
+
+
+  /**
+   * Given a comma-delimited list of integers, grab and parse the first int
+   * @param str a comma-delimited list of values, the first of which is an int.
+   * @return the first field in the string, cast to int
+   */
+  private int getFirstInt(String str) {
+    String [] parts = str.split(",");
+    return Integer.parseInt(parts[0]);
+  }
+
+  public void runSplitByTest(String splitByCol, int expectedSum)
+      throws IOException {
+
+    String [] columns = HsqldbTestServer.getFieldNames();
+    ClassLoader prevClassLoader = null;
+    SequenceFile.Reader reader = null;
+
+    String [] argv = getArgv(true, columns, splitByCol);
+    runImport(argv);
+    try {
+      ImportOptions opts = new ImportOptions();
+      opts.parse(getArgv(false, columns, splitByCol));
+
+      CompilationManager compileMgr = new CompilationManager(opts);
+      String jarFileName = compileMgr.getJarFilename();
+
+      prevClassLoader = ClassLoaderStack.addJarFile(jarFileName, getTableName());
+
+      reader = SeqFileReader.getSeqFileReader(getDataFilePath().toString());
+
+      // here we can actually instantiate (k, v) pairs.
+      Configuration conf = new Configuration();
+      Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+      Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+      // We know that these values are two ints separated by a ',' character.
+      // Since this is all dynamic, though, we don't want to actually link against
+      // the class and use its methods. So we just parse this back into int fields manually.
+      // Sum them up and ensure that we get the expected total for the first column, to
+      // verify that we got all the results from the db into the file.
+
+      // Sum up everything in the file.
+      int curSum = 0;
+      while (reader.next(key) != null) {
+        reader.getCurrentValue(val);
+        curSum += getFirstInt(val.toString());
+      }
+
+      assertEquals("Total sum of first db column mismatch", expectedSum, curSum);
+    } catch (InvalidOptionsException ioe) {
+      fail(ioe.toString());
+    } finally {
+      IOUtils.closeStream(reader);
+
+      if (null != prevClassLoader) {
+        ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
+      }
+    }
+  }
+
+  public void testSplitByFirstCol() throws IOException {
+    String splitByCol = "INTFIELD1";
+    runSplitByTest(splitByCol, HsqldbTestServer.getFirstColSum());
+  }
+
+  public void testSplitBySecondCol() throws IOException {
+    String splitByCol = "INTFIELD2";
+    runSplitByTest(splitByCol, HsqldbTestServer.getFirstColSum());
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestWhere.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestWhere.java?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestWhere.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestWhere.java Thu Sep 17 15:37:22 2009
@@ -68,13 +68,15 @@
     args.add(columnsString);
     args.add("--where");
     args.add(whereClause);
-    args.add("--order-by");
+    args.add("--split-by");
     args.add("INTFIELD1");
     args.add("--warehouse-dir");
     args.add(getWarehouseDir());
     args.add("--connect");
     args.add(HsqldbTestServer.getUrl());
     args.add("--as-sequencefile");
+    args.add("--num-mappers");
+    args.add("1");
 
     return args.toArray(new String[0]);
   }

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java Thu Sep 17 15:37:22 2009
@@ -61,8 +61,10 @@
     args.add("--connect");
     args.add(HsqldbTestServer.getUrl());
     args.add("--hive-import");
-    args.add("--order-by");
+    args.add("--split-by");
     args.add(getColNames()[0]);
+    args.add("--num-mappers");
+    args.add("1");
 
     if (null != moreArgs) {
       for (String arg: moreArgs) {

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java Thu Sep 17 15:37:22 2009
@@ -198,6 +198,8 @@
     args.add(getCurrentUser());
     args.add("--where");
     args.add("id > 1");
+    args.add("--num-mappers");
+    args.add("1");
 
     if (mysqlOutputDelims) {
       args.add("--mysql-delimiters");

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java Thu Sep 17 15:37:22 2009
@@ -150,6 +150,8 @@
     args.add("--password");
     args.add(AUTH_TEST_PASS);
     args.add("--mysql-delimiters");
+    args.add("--num-mappers");
+    args.add("1");
 
     return args.toArray(new String[0]);
   }

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java Thu Sep 17 15:37:22 2009
@@ -156,6 +156,8 @@
     args.add(ORACLE_USER_NAME);
     args.add("--password");
     args.add(ORACLE_USER_PASS);
+    args.add("--num-mappers");
+    args.add("1");
 
     return args.toArray(new String[0]);
   }

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java Thu Sep 17 15:37:22 2009
@@ -181,7 +181,7 @@
     if (isDirect) {
       filePath = new Path(tablePath, "data-00000");
     } else {
-      filePath = new Path(tablePath, "part-00000");
+      filePath = new Path(tablePath, "part-m-00000");
     }
 
     File tableFile = new File(tablePath.toString());

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapred/MapredTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapred/MapredTests.java?rev=816240&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapred/MapredTests.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapred/MapredTests.java Thu Sep 17 15:37:22 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapred;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+/**
+ * All tests for Sqoop old mapred-api (org.apache.hadoop.sqoop.mapred)
+ */
+public final class MapredTests {
+
+  private MapredTests() { }
+
+  public static Test suite() {
+    TestSuite suite = new TestSuite("Tests for org.apache.hadoop.sqoop.mapred");
+    suite.addTestSuite(TestAutoProgressMapRunner.class);
+    return suite;
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapreduce/MapreduceTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapreduce/MapreduceTests.java?rev=816240&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapreduce/MapreduceTests.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapreduce/MapreduceTests.java Thu Sep 17 15:37:22 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapreduce;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+/**
+ * All tests for Sqoop new mapreduce-api (org.apache.hadoop.sqoop.mapreduce)
+ */
+public final class MapreduceTests {
+
+  private MapreduceTests() { }
+
+  public static Test suite() {
+    TestSuite suite = new TestSuite("Tests for org.apache.hadoop.sqoop.mapreduce");
+    suite.addTestSuite(TestTextImportMapper.class);
+    return suite;
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java?rev=816240&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapreduce/TestTextImportMapper.java Thu Sep 17 15:37:22 2009
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+
+import junit.framework.TestCase;
+
+/**
+ * Test the TextImportMapper
+ */
+public class TestTextImportMapper extends TestCase {
+
+
+  static class DummyDBWritable implements DBWritable {
+    long field;
+
+    public DummyDBWritable(final long val) {
+      this.field = val;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      field = in.readLong();
+    }
+
+    public void write(DataOutput out) throws IOException {
+      out.writeLong(field);
+    }
+
+    public void readFields(ResultSet rs) throws SQLException {
+      field = rs.getLong(1);
+    }
+
+    public void write(PreparedStatement s) throws SQLException {
+      s.setLong(1, field);
+    }
+
+    public String toString() {
+      return "" + field;
+    }
+  }
+
+  public void testTextImport() {
+    TextImportMapper m = new TextImportMapper();
+    MapDriver<LongWritable, DBWritable, Text, NullWritable> driver =
+      new MapDriver<LongWritable, DBWritable, Text, NullWritable>(m);
+
+    driver.withInput(new LongWritable(0), new DummyDBWritable(42))
+          .withOutput(new Text("42"), NullWritable.get())
+          .runTest();
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java Thu Sep 17 15:37:22 2009
@@ -73,8 +73,8 @@
     args.add("--connect");
     args.add(HsqldbTestServer.getUrl());
     args.add("--as-textfile");
-    args.add("--order-by");
-    args.add("DATA_COL0"); // always order by first column.
+    args.add("--split-by");
+    args.add("DATA_COL0"); // always split by first column.
     args.add("--fields-terminated-by");
     args.add(fieldTerminator);
     args.add("--lines-terminated-by");
@@ -87,7 +87,8 @@
       args.add("--optionally-enclosed-by");
     }
     args.add(encloser);
-
+    args.add("--num-mappers");
+    args.add("1");
 
     return args.toArray(new String[0]);
   }

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java?rev=816240&r1=816239&r2=816240&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java Thu Sep 17 15:37:22 2009
@@ -277,7 +277,7 @@
       colNames = getColNames();
     }
 
-    String orderByCol = colNames[0];
+    String splitByCol = colNames[0];
     String columnsString = "";
     for (String col : colNames) {
       columnsString += col + ",";
@@ -298,13 +298,15 @@
     args.add(getTableName());
     args.add("--columns");
     args.add(columnsString);
-    args.add("--order-by");
-    args.add(orderByCol);
+    args.add("--split-by");
+    args.add(splitByCol);
     args.add("--warehouse-dir");
     args.add(getWarehouseDir());
     args.add("--connect");
     args.add(HsqldbTestServer.getUrl());
     args.add("--as-sequencefile");
+    args.add("--num-mappers");
+    args.add("1");
 
     return args.toArray(new String[0]);
   }
@@ -316,7 +318,7 @@
   }
 
   protected Path getDataFilePath() {
-    return new Path(getTablePath(), "part-00000");
+    return new Path(getTablePath(), "part-m-00000");
   }
 
   protected void removeTableDir() {
@@ -350,8 +352,7 @@
       ret = ToolRunner.run(importer, getArgv(true, importCols));
     } catch (Exception e) {
       LOG.error("Got exception running Sqoop: " + e.toString());
-      e.printStackTrace();
-      ret = 1;
+      throw new RuntimeException(e);
     }
 
     // expect a successful return.



Mime
View raw message