hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r925442 - in /hadoop/mapreduce/trunk: ./ src/contrib/sqoop/doc/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred...
Date Fri, 19 Mar 2010 21:38:24 GMT
Author: tomwhite
Date: Fri Mar 19 21:38:24 2010
New Revision: 925442

URL: http://svn.apache.org/viewvc?rev=925442&view=rev
Log:
MAPREDUCE-1460. Oracle support in DataDrivenDBInputFormat. Contributed by Aaron Kimball.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDateSplitter.java
Removed:
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/AutoProgressMapRunner.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/RawKeyTextOutputFormat.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/TextImportMapper.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapred/MapredTests.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/mapred/TestAutoProgressMapRunner.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/doc/api-reference.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/SmokeTests.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleCompatTest.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=925442&r1=925441&r2=925442&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Mar 19 21:38:24 2010
@@ -220,6 +220,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1593.  [Rumen] Improvements to random seed generation (tamas via
     mahadev)
 
+    MAPREDUCE-1460. Oracle support in DataDrivenDBInputFormat.
+    (Aaron Kimball via tomwhite)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/doc/api-reference.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/doc/api-reference.txt?rev=925442&r1=925441&r2=925442&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/doc/api-reference.txt (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/doc/api-reference.txt Fri Mar 19 21:38:24 2010
@@ -175,7 +175,6 @@ The following subpackages under +org.apa
 * +lib+ - The external public API (described earlier).
 * +manager+ - The +ConnManager+ and +ManagerFactory+ interface and their
   implementations.
-* +mapred+ - Classes interfacing with the old (pre-0.20) MapReduce API.
 * +mapreduce+ - Classes interfacing with the new (0.20+) MapReduce API....
 * +orm+ - Code auto-generation.
 * +util+ - Miscellaneous utility classes.
@@ -188,12 +187,9 @@ unsplittable compression libraries (e.g.
 Sqoop import while still allowing subsequent MapReduce jobs to use multiple
 input splits per dataset.
 
-Code in the +mapred+ package should be considered deprecated. The +mapreduce+
-package contains +DataDrivenImportJob+, which uses the +DataDrivenDBInputFormat+
-introduced in 0.21. The mapred package contains +ImportJob+, which uses the
-older +DBInputFormat+. Most +ConnManager+ implementations use
-+DataDrivenImportJob+; +DataDrivenDBInputFormat+ does not currently work with
-Oracle in all circumstances, so it remains on the old code-path.
+The +mapreduce+ package contains +DataDrivenImportJob+, which uses the
++DataDrivenDBInputFormat+ introduced in 0.21.  Most +ConnManager+
+implementations use +DataDrivenImportJob+ to perform their imports.
 
 The +orm+ package contains code used for class generation. It depends on the
 JDK's tools.jar which provides the com.sun.tools.javac package.

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java?rev=925442&r1=925441&r2=925442&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java
Fri Mar 19 21:38:24 2010
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.sqoop.manager;
 
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
 import org.apache.hadoop.sqoop.SqoopOptions;
 
 /**
@@ -29,11 +31,13 @@ public class ImportJobContext {
   private String tableName;
   private String jarFile;
   private SqoopOptions options;
+  private Class<? extends InputFormat> inputFormatClass;
 
   public ImportJobContext(final String table, final String jar, final SqoopOptions opts)
{
     this.tableName = table;
     this.jarFile = jar;
     this.options = opts;
+    this.inputFormatClass = DataDrivenDBInputFormat.class;
   }
 
   /** @return the name of the table to import. */
@@ -52,5 +56,15 @@ public class ImportJobContext {
   public SqoopOptions getOptions() {
     return options;
   }
+
+  /** Set the InputFormat to use for the import job. */
+  public void setInputFormat(Class<? extends InputFormat> ifClass) {
+    this.inputFormatClass = ifClass;
+  }
+
+  /** @return the InputFormat to use for the import job. */
+  public Class<? extends InputFormat> getInputFormat() {
+    return this.inputFormatClass;
+  }
 }
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java?rev=925442&r1=925441&r2=925442&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
Fri Mar 19 21:38:24 2010
@@ -32,8 +32,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.db.OracleDataDrivenDBInputFormat;
 import org.apache.hadoop.sqoop.SqoopOptions;
-import org.apache.hadoop.sqoop.mapred.ImportJob;
 import org.apache.hadoop.sqoop.util.ImportException;
 
 /**
@@ -131,30 +131,11 @@ public class OracleManager extends Gener
     }
   }
 
-  /**
-   * This importTable() implementation continues to use the older DBInputFormat
-   * because DataDrivenDBInputFormat does not currently work with Oracle.
-   */
   public void importTable(ImportJobContext context)
       throws IOException, ImportException {
-
-    String tableName = context.getTableName();
-    String jarFile = context.getJarFile();
-    SqoopOptions options = context.getOptions();
-    ImportJob importer = new ImportJob(options);
-    String splitCol = options.getSplitByCol();
-    if (null == splitCol) {
-      // If the user didn't specify a splitting column, try to infer one.
-      splitCol = getPrimaryKey(tableName);
-    }
-
-    if (null == splitCol) {
-      // Can't infer a primary key.
-      throw new ImportException("No primary key could be found for table " + tableName
-          + ". Please specify one with --split-by.");
-    }
-
-    importer.runImport(tableName, jarFile, splitCol, options.getConf());
+    // Specify the Oracle-specific DBInputFormat for import.
+    context.setInputFormat(OracleDataDrivenDBInputFormat.class);
+    super.importTable(context);
   }
 
   @Override

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java?rev=925442&r1=925441&r2=925442&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
Fri Mar 19 21:38:24 2010
@@ -281,7 +281,8 @@ public abstract class SqlManager extends
     String tableName = context.getTableName();
     String jarFile = context.getJarFile();
     SqoopOptions options = context.getOptions();
-    DataDrivenImportJob importer = new DataDrivenImportJob(options);
+    DataDrivenImportJob importer =
+        new DataDrivenImportJob(options, context.getInputFormat());
     String splitCol = options.getSplitByCol();
     if (null == splitCol) {
       // If the user didn't specify a splitting column, try to infer one.

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java?rev=925442&r1=925441&r2=925442&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java
Fri Mar 19 21:38:24 2010
@@ -31,6 +31,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -57,6 +58,7 @@ public class DataDrivenImportJob {
 
   private final SqoopOptions options;
   private final Class<Mapper> mapperClass;
+  private final Class<? extends InputFormat> inputFormatClass;
 
   // For dependency-injection purposes, we can specify a mapper class
   // to use during tests.
@@ -65,7 +67,13 @@ public class DataDrivenImportJob {
 
   @SuppressWarnings("unchecked")
   public DataDrivenImportJob(final SqoopOptions opts) {
+    this(opts, DataDrivenDBInputFormat.class);
+  }
+
+  public DataDrivenImportJob(final SqoopOptions opts,
+      final Class<? extends InputFormat> inputFormatClass) {
     this.options = opts;
+    this.inputFormatClass = inputFormatClass;
     this.mapperClass = (Class<Mapper>) opts.getConf().getClass(
         DATA_DRIVEN_MAPPER_KEY, null);
   }
@@ -152,8 +160,6 @@ public class DataDrivenImportJob {
       job.getConfiguration().setInt("mapred.map.tasks", numMapTasks);
       job.setNumReduceTasks(0);
 
-      job.setInputFormatClass(DataDrivenDBInputFormat.class);
-
       FileOutputFormat.setOutputPath(job, outputPath);
 
       ConnManager mgr = new ConnFactory(conf).getManager(options);
@@ -190,6 +196,9 @@ public class DataDrivenImportJob {
           mgr.escapeColName(splitByCol), sqlColNames);
       job.getConfiguration().set(DBConfiguration.INPUT_CLASS_PROPERTY, tableClassName);
 
+      LOG.debug("Using InputFormat: " + inputFormatClass);
+      job.setInputFormatClass(inputFormatClass);
+
       PerfCounters counters = new PerfCounters();
       counters.startClock();
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java?rev=925442&r1=925441&r2=925442&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
Fri Mar 19 21:38:24 2010
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.sqoop;
 
-import org.apache.hadoop.sqoop.mapred.MapredTests;
-
 import junit.framework.Test;
 import junit.framework.TestSuite;
 
@@ -35,7 +33,6 @@ public final class AllTests {
 
     suite.addTest(SmokeTests.suite());
     suite.addTest(ThirdPartyTests.suite());
-    suite.addTest(MapredTests.suite());
 
     return suite;
   }

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/SmokeTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/SmokeTests.java?rev=925442&r1=925441&r2=925442&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/SmokeTests.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/SmokeTests.java
Fri Mar 19 21:38:24 2010
@@ -25,7 +25,6 @@ import org.apache.hadoop.sqoop.lib.TestF
 import org.apache.hadoop.sqoop.lib.TestRecordParser;
 import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
 import org.apache.hadoop.sqoop.manager.TestSqlManager;
-import org.apache.hadoop.sqoop.mapred.MapredTests;
 import org.apache.hadoop.sqoop.mapreduce.MapreduceTests;
 import org.apache.hadoop.sqoop.orm.TestClassWriter;
 import org.apache.hadoop.sqoop.orm.TestParseMethods;

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleCompatTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleCompatTest.java?rev=925442&r1=925441&r2=925442&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleCompatTest.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleCompatTest.java
Fri Mar 19 21:38:24 2010
@@ -68,11 +68,6 @@ public class OracleCompatTest extends Ma
     OracleUtils.dropTable(table, getManager());
   }
 
-  @Override
-  protected Path getDataFilePath() {
-    return new Path(getTablePath(), "part-00000");
-  }
-
   private String padString(int width, String str) {
     int extra = width - str.length();
     for (int i = 0; i < extra; i++) {

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java?rev=925442&r1=925441&r2=925442&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java
Fri Mar 19 21:38:24 2010
@@ -173,7 +173,7 @@ public class OracleManagerTest extends I
 
     Path warehousePath = new Path(this.getWarehouseDir());
     Path tablePath = new Path(warehousePath, TABLE_NAME);
-    Path filePath = new Path(tablePath, "part-00000");
+    Path filePath = new Path(tablePath, "part-m-00000");
 
     File tableFile = new File(tablePath.toString());
     if (tableFile.exists() && tableFile.isDirectory()) {

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java?rev=925442&r1=925441&r2=925442&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java
Fri Mar 19 21:38:24 2010
@@ -36,7 +36,6 @@ import org.apache.hadoop.util.Reflection
 
 import org.apache.hadoop.sqoop.SqoopOptions;
 import org.apache.hadoop.sqoop.SqoopOptions.InvalidOptionsException;
-import org.apache.hadoop.sqoop.mapred.RawKeyTextOutputFormat;
 import org.apache.hadoop.sqoop.orm.CompilationManager;
 import org.apache.hadoop.sqoop.testutil.CommonArgs;
 import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
@@ -134,7 +133,6 @@ public class TestParseMethods extends Im
 
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(NullWritable.class);
-      job.setOutputFormat(RawKeyTextOutputFormat.class);
 
       JobClient.runJob(job);
     } catch (InvalidOptionsException ioe) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java?rev=925442&r1=925441&r2=925442&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java
Fri Mar 19 21:38:24 2010
@@ -266,18 +266,21 @@ public class DataDrivenDBInputFormat<T e
     Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
     String dbProductName = getDBProductName();
 
+    LOG.debug("Creating db record reader for db product: " + dbProductName);
+
     try {
       // use database product name to determine appropriate record reader.
       if (dbProductName.startsWith("MYSQL")) {
         // use MySQL-specific db reader.
         return new MySQLDataDrivenDBRecordReader<T>(split, inputClass,
-            conf, getConnection(), dbConf, dbConf.getInputConditions(), dbConf.getInputFieldNames(),
-            dbConf.getInputTableName());
+            conf, getConnection(), dbConf, dbConf.getInputConditions(),
+            dbConf.getInputFieldNames(), dbConf.getInputTableName());
       } else {
         // Generic reader.
         return new DataDrivenDBRecordReader<T>(split, inputClass,
-            conf, getConnection(), dbConf, dbConf.getInputConditions(), dbConf.getInputFieldNames(),
-            dbConf.getInputTableName());
+            conf, getConnection(), dbConf, dbConf.getInputConditions(),
+            dbConf.getInputFieldNames(), dbConf.getInputTableName(),
+            dbProductName);
       }
     } catch (SQLException ex) {
       throw new IOException(ex.getMessage());

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java?rev=925442&r1=925441&r2=925442&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java
Fri Mar 19 21:38:24 2010
@@ -54,15 +54,18 @@ public class DataDrivenDBRecordReader<T 
 
   private static final Log LOG = LogFactory.getLog(DataDrivenDBRecordReader.class);
 
+  private String dbProductName; // database manufacturer string.
+
   /**
    * @param split The InputSplit to read data for
    * @throws SQLException 
    */
   public DataDrivenDBRecordReader(DBInputFormat.DBInputSplit split,
       Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
-      String cond, String [] fields, String table)
+      String cond, String [] fields, String table, String dbProduct)
       throws SQLException {
     super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
+    this.dbProductName = dbProduct;
   }
 
   /** Returns the query for selecting the records,
@@ -96,7 +99,11 @@ public class DataDrivenDBRecordReader<T 
       }
 
       query.append(" FROM ").append(tableName);
-      query.append(" AS ").append(tableName); //in hsqldb this is necessary
+      if (!dbProductName.startsWith("ORACLE")) {
+        // Seems to be necessary for hsqldb? Oracle explicitly does *not*
+        // use this clause.
+        query.append(" AS ").append(tableName);
+      }
       query.append(" WHERE ");
       if (conditions != null && conditions.length() > 0) {
         // Put the user's conditions first.
@@ -119,6 +126,8 @@ public class DataDrivenDBRecordReader<T 
           conditionClauses.toString()));
     }
 
+    LOG.debug("Using query: " + query.toString());
+
     return query.toString();
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java?rev=925442&r1=925441&r2=925442&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java Fri
Mar 19 21:38:24 2010
@@ -52,8 +52,8 @@ public class DateSplitter extends Intege
     minVal = resultSetColToLong(results, 1, sqlDataType);
     maxVal = resultSetColToLong(results, 2, sqlDataType);
 
-    String lowClausePrefix = colName + " >= '";
-    String highClausePrefix = colName + " < '";
+    String lowClausePrefix = colName + " >= ";
+    String highClausePrefix = colName + " < ";
 
     int numSplits = conf.getInt("mapred.map.tasks", 1);
     if (numSplits < 1) {
@@ -99,13 +99,13 @@ public class DateSplitter extends Intege
         }
         // This is the last one; use a closed interval.
         splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-            lowClausePrefix + startDate.toString() + "'",
-            colName + " <= '" + endDate.toString() + "'"));
+            lowClausePrefix + dateToString(startDate),
+            colName + " <= " + dateToString(endDate)));
       } else {
         // Normal open-interval case.
         splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-            lowClausePrefix + startDate.toString() + "'",
-            highClausePrefix + endDate.toString() + "'"));
+            lowClausePrefix + dateToString(startDate),
+            highClausePrefix + dateToString(endDate)));
       }
 
       start = end;
@@ -159,4 +159,15 @@ public class DateSplitter extends Intege
       return null;
     }
   }
+
+  /**
+   * Given a Date 'd', format it as a string for use in a SQL date
+   * comparison operation.
+   * @param d the date to format.
+   * @return the string representing this date in SQL with any appropriate
+   * quotation characters, etc.
+   */
+  protected String dateToString(Date d) {
+    return "'" + d.toString() + "'";
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java?rev=925442&r1=925441&r2=925442&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java
Fri Mar 19 21:38:24 2010
@@ -34,7 +34,7 @@ public class MySQLDataDrivenDBRecordRead
   public MySQLDataDrivenDBRecordReader(DBInputFormat.DBInputSplit split,
       Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
       String cond, String [] fields, String table) throws SQLException {
-    super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
+    super(split, inputClass, conf, conn, dbConfig, cond, fields, table, "MYSQL");
   }
 
   // Execute statements for mysql in unbuffered mode.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java?rev=925442&r1=925441&r2=925442&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java
Fri Mar 19 21:38:24 2010
@@ -99,9 +99,10 @@ public class OracleDBRecordReader<T exte
    * @param conn      Connection object
    * @throws          SQLException instance
    */
-  private void setSessionTimeZone(Connection conn) throws SQLException {
-    // need to use reflection to call the method setSessionTimeZone on the OracleConnection
class
-    // because oracle specific java libraries are not accessible in this context
+  public static void setSessionTimeZone(Connection conn) throws SQLException {
+    // need to use reflection to call the method setSessionTimeZone on
+    // the OracleConnection class because oracle specific java libraries are
+    // not accessible in this context.
     Method method;
     try {
       method = conn.getClass().getMethod(

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java?rev=925442&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java
Fri Mar 19 21:38:24 2010
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A InputFormat that reads input data from an SQL table in an Oracle db.
+ */
+public class OracleDataDrivenDBInputFormat<T extends DBWritable>
+    extends DataDrivenDBInputFormat<T> implements Configurable {
+
+  /**
+   * @return the DBSplitter implementation to use to divide the table/query into InputSplits.
+   */
+  @Override
+  protected DBSplitter getSplitter(int sqlDataType) {
+    switch (sqlDataType) {
+    case Types.DATE:
+    case Types.TIME:
+    case Types.TIMESTAMP:
+      return new OracleDateSplitter();
+
+    default:
+      return super.getSplitter(sqlDataType);
+    }
+  }
+
+  @Override
+  protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
+      Configuration conf) throws IOException {
+
+    DBConfiguration dbConf = getDBConf();
+    @SuppressWarnings("unchecked")
+    Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
+
+    try {
+      // Use Oracle-specific db reader
+      return new OracleDataDrivenDBRecordReader<T>(split, inputClass,
+          conf, getConnection(), dbConf, dbConf.getInputConditions(),
+          dbConf.getInputFieldNames(), dbConf.getInputTableName());
+    } catch (SQLException ex) {
+      throw new IOException(ex.getMessage());
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBRecordReader.java?rev=925442&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBRecordReader.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBRecordReader.java
Fri Mar 19 21:38:24 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A RecordReader that reads records from a Oracle table via DataDrivenDBRecordReader
+ */
+public class OracleDataDrivenDBRecordReader<T extends DBWritable>
+    extends DataDrivenDBRecordReader<T> {
+
+  public OracleDataDrivenDBRecordReader(DBInputFormat.DBInputSplit split,
+      Class<T> inputClass, Configuration conf, Connection conn,
+      DBConfiguration dbConfig, String cond, String [] fields,
+      String table) throws SQLException {
+
+    super(split, inputClass, conf, conn, dbConfig, cond, fields, table,
+        "ORACLE");
+
+    // Must initialize the tz used by the connection for Oracle.
+    OracleDBRecordReader.setSessionTimeZone(conn);
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDateSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDateSplitter.java?rev=925442&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDateSplitter.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDateSplitter.java
Fri Mar 19 21:38:24 2010
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.util.Date;
+
+/**
+ * Implement DBSplitter over date/time values returned by an Oracle db.
+ * Make use of logic from DateSplitter, since this just needs to use
+ * some Oracle-specific functions on the formatting end when generating
+ * InputSplits.
+ */
+public class OracleDateSplitter extends DateSplitter {
+
+  @SuppressWarnings("unchecked")
+  @Override
+  protected String dateToString(Date d) {
+    // Oracle Data objects are always actually Timestamps
+    return "TO_TIMESTAMP('" + d.toString() + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
+  }
+}



Mime
View raw message