hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r796357 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/lib/db/ src/java/org/apache/hadoop/mapreduce/lib/db/
Date Tue, 21 Jul 2009 16:13:02 GMT
Author: tomwhite
Date: Tue Jul 21 16:13:02 2009
New Revision: 796357

URL: http://svn.apache.org/viewvc?rev=796357&view=rev
Log:
MAPREDUCE-716. Make DBInputFormat work with Oracle. Contributed by Aaron Kimball.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/db/DBInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=796357&r1=796356&r2=796357&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Jul 21 16:13:02 2009
@@ -239,3 +239,5 @@
     MAPREDUCE-717. Fixes some corner case issues in speculative 
     execution heuristics. (Devaraj Das)
 
+    MAPREDUCE-716. Make DBInputFormat work with Oracle. (Aaron Kimball
+    via tomwhite)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/db/DBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/db/DBInputFormat.java?rev=796357&r1=796356&r2=796357&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/db/DBInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/db/DBInputFormat.java Tue
Jul 21 16:13:02 2009
@@ -19,9 +19,11 @@
 package org.apache.hadoop.mapred.lib.db;
 
 import java.io.IOException;
+import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.InputFormat;
@@ -46,15 +48,16 @@
    * key and DBWritables as value.  
    */
   protected class DBRecordReader extends
-      org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T>.DBRecordReader
+      org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>
       implements RecordReader<LongWritable, T> {
     /**
      * @param split The InputSplit to read data for
      * @throws SQLException 
      */
     protected DBRecordReader(DBInputSplit split, Class<T> inputClass, 
-      JobConf job) throws SQLException {
-     super(split, inputClass, job);
+        JobConf job, Connection conn, DBConfiguration dbConfig, String cond,
+        String [] fields, String table) throws SQLException {
+      super(split, inputClass, job, conn, dbConfig, cond, fields, table);
     }
 
     /** {@inheritDoc} */
@@ -78,6 +81,45 @@
   }
 
   /**
+   * A RecordReader implementation that just passes through to a wrapped
+   * RecordReader built with the new API.
+   */
+  private static class DBRecordReaderWrapper<T extends DBWritable>
+      implements RecordReader<LongWritable, T> {
+
+    private org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> rr;
+    
+    public DBRecordReaderWrapper(
+        org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> inner) {
+      this.rr = inner;
+    }
+
+    public void close() throws IOException {
+      rr.close();
+    }
+
+    public LongWritable createKey() {
+      return new LongWritable();
+    }
+
+    public T createValue() {
+      return rr.createValue();
+    }
+
+    public float getProgress() throws IOException {
+      return rr.getProgress();
+    }
+    
+    public long getPos() throws IOException {
+      return rr.getPos();
+    }
+
+    public boolean next(LongWritable key, T value) throws IOException {
+      return rr.next(key, value);
+    }
+  }
+
+  /**
    * A Class that does nothing, implementing DBWritable
    */
   public static class NullDBWritable extends 
@@ -116,13 +158,11 @@
   public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
       JobConf job, Reporter reporter) throws IOException {
 
-    Class inputClass = super.getDBConf().getInputClass();
-    try {
-      return new DBRecordReader((DBInputSplit) split, inputClass, job);
-    }
-    catch (SQLException ex) {
-      throw new IOException(ex.getMessage());
-    }
+    // wrap the DBRR in a shim class to deal with API differences.
+    return new DBRecordReaderWrapper<T>(
+        (org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>) 
+        createDBRecordReader(
+          (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit) split, job));
   }
 
   /** {@inheritDoc} */

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java?rev=796357&r1=796356&r2=796357&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java
Tue Jul 21 16:13:02 2009
@@ -22,6 +22,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.sql.Connection;
+import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -49,167 +50,10 @@
  * The SQL query, and input class can be using one of the two 
  * setInput methods.
  */
-public class DBInputFormat<T  extends DBWritable>
+public class DBInputFormat<T extends DBWritable>
     extends InputFormat<LongWritable, T> implements Configurable {
-  /**
-   * A RecordReader that reads records from a SQL table.
-   * Emits LongWritables containing the record number as 
-   * key and DBWritables as value.  
-   */
-  public class DBRecordReader extends
-      RecordReader<LongWritable, T> {
-    private ResultSet results;
-
-    private Statement statement;
-
-    private Class<T> inputClass;
-
-    private Configuration conf;
-
-    private DBInputSplit split;
-
-    private long pos = 0;
-    
-    private LongWritable key = null;
-    
-    private T value = null;
-
-    /**
-     * @param split The InputSplit to read data for
-     * @throws SQLException 
-     */
-    public DBRecordReader(DBInputSplit split, 
-        Class<T> inputClass, Configuration conf) throws SQLException {
-      this.inputClass = inputClass;
-      this.split = split;
-      this.conf = conf;
-      
-      statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY,
-                    ResultSet.CONCUR_READ_ONLY);
-
-      //statement.setFetchSize(Integer.MIN_VALUE);
-      results = statement.executeQuery(getSelectQuery());
-    }
-
-    /** Returns the query for selecting the records, 
-     * subclasses can override this for custom behaviour.*/
-    protected String getSelectQuery() {
-      StringBuilder query = new StringBuilder();
-      
-      if(dbConf.getInputQuery() == null) {
-        query.append("SELECT ");
-
-        for (int i = 0; i < fieldNames.length; i++) {
-          query.append(fieldNames[i]);
-          if(i != fieldNames.length -1) {
-            query.append(", ");
-          }
-        }
-
-        query.append(" FROM ").append(tableName);
-        query.append(" AS ").append(tableName); //in hsqldb this is necessary
-        if (conditions != null && conditions.length() > 0)
-          query.append(" WHERE (").append(conditions).append(")");
-        String orderBy = dbConf.getInputOrderBy();
-        if(orderBy != null && orderBy.length() > 0) {
-          query.append(" ORDER BY ").append(orderBy);
-        }
-      }
-      else {
-        query.append(dbConf.getInputQuery());
-      }
-
-      try {
-        query.append(" LIMIT ").append(split.getLength());
-        query.append(" OFFSET ").append(split.getStart());
-      }
-      catch (IOException ex) {
-        //ignore, will not throw
-      }
-      return query.toString();
-    }
-
-    /** {@inheritDoc} */
-    public void close() throws IOException {
-      try {
-        connection.commit();
-        results.close();
-        statement.close();
-      } catch (SQLException e) {
-        throw new IOException(e.getMessage());
-      }
-    }
-
-    public void initialize(InputSplit split, TaskAttemptContext context) 
-        throws IOException, InterruptedException {
-      //do nothing
-    }
-
-    /** {@inheritDoc} */
-    public LongWritable getCurrentKey() {
-      return key;  
-    }
-
-    /** {@inheritDoc} */
-    public T getCurrentValue() {
-      return value;
-    }
-
-    /**
-     * @deprecated 
-     */
-    @Deprecated
-    protected T createValue() {
-      return ReflectionUtils.newInstance(inputClass, conf);
-    }
 
-    /**
-     * @deprecated 
-     */
-    @Deprecated
-    protected long getPos() throws IOException {
-      return pos;
-    }
-
-    /**
-     * @deprecated Use {@link #nextKeyValue()}
-     */
-    @Deprecated
-    protected boolean next(LongWritable key, T value) throws IOException {
-      this.key = key;
-      this.value = value;
-      return nextKeyValue();
-    }
-
-    /** {@inheritDoc} */
-    public float getProgress() throws IOException {
-      return pos / (float)split.getLength();
-    }
-
-    /** {@inheritDoc} */
-    public boolean nextKeyValue() throws IOException {
-      try {
-        if (key == null) {
-          key = new LongWritable();
-        }
-        if (value == null) {
-          value = createValue();
-        }
-        if (!results.next())
-          return false;
-
-        // Set the key field value as the output key value
-        key.set(pos + split.getStart());
-
-        value.readFields(results);
-
-        pos ++;
-      } catch (SQLException e) {
-        throw new IOException(e.getMessage());
-      }
-      return true;
-    }
-  }
+  private String dbProductName = "DEFAULT";
 
   /**
    * A Class that does nothing, implementing DBWritable
@@ -308,6 +152,9 @@
       this.connection = dbConf.getConnection();
       this.connection.setAutoCommit(false);
       connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+
+      DatabaseMetaData dbMeta = connection.getMetaData();
+      this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
     }
     catch (Exception ex) {
       throw new RuntimeException(ex);
@@ -326,26 +173,43 @@
     return dbConf;
   }
   
-  /** {@inheritDoc} */
-  @SuppressWarnings("unchecked")
-  public RecordReader<LongWritable, T> createRecordReader(InputSplit split,
-      TaskAttemptContext context) throws IOException, InterruptedException {  
+  protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
+      Configuration conf) throws IOException {
 
     Class inputClass = dbConf.getInputClass();
     try {
-      return new DBRecordReader((DBInputSplit) split, inputClass,
-                                context.getConfiguration());
-    }
-    catch (SQLException ex) {
+      // use database product name to determine appropriate record reader.
+      if (dbProductName.startsWith("ORACLE")) {
+        // use Oracle-specific db reader.
+        return new OracleDBRecordReader<T>(split, inputClass,
+            conf, connection, getDBConf(), conditions, fieldNames, tableName);
+      } else if (dbProductName.startsWith("MYSQL")) {
+        // use MySQL-specific db reader.
+        return new MySQLDBRecordReader<T>(split, inputClass,
+            conf, connection, getDBConf(), conditions, fieldNames, tableName);
+      } else {
+        // Generic reader.
+        return new DBRecordReader<T>(split, inputClass,
+            conf, connection, getDBConf(), conditions, fieldNames, tableName);
+      }
+    } catch (SQLException ex) {
       throw new IOException(ex.getMessage());
     }
   }
 
   /** {@inheritDoc} */
+  @SuppressWarnings("unchecked")
+  public RecordReader<LongWritable, T> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException, InterruptedException {  
+
+    return createDBRecordReader((DBInputSplit) split, context.getConfiguration());
+  }
+
+  /** {@inheritDoc} */
   public List<InputSplit> getSplits(JobContext job) throws IOException {
 
-	ResultSet results = null;  
-	Statement statement = null;
+    ResultSet results = null;  
+    Statement statement = null;
     try {
       statement = connection.createStatement();
 

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java?rev=796357&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java
Tue Jul 21 16:13:02 2009
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A RecordReader that reads records from a SQL table.
+ * Emits LongWritables containing the record number as 
+ * key and DBWritables as value.  
+ */
+public class DBRecordReader<T extends DBWritable> extends
+    RecordReader<LongWritable, T> {
+  private ResultSet results;
+
+  private Class<T> inputClass;
+
+  private Configuration conf;
+
+  private DBInputFormat.DBInputSplit split;
+
+  private long pos = 0;
+  
+  private LongWritable key = null;
+  
+  private T value = null;
+
+  private Connection connection;
+
+  private PreparedStatement statement;
+
+  private DBConfiguration dbConf;
+
+  private String conditions;
+
+  private String [] fieldNames;
+
+  private String tableName;
+
+  /**
+   * @param split The InputSplit to read data for
+   * @throws SQLException 
+   */
+  public DBRecordReader(DBInputFormat.DBInputSplit split, 
+      Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
+      String cond, String [] fields, String table)
+      throws SQLException {
+    this.inputClass = inputClass;
+    this.split = split;
+    this.conf = conf;
+    this.connection = conn;
+    this.dbConf = dbConfig;
+    this.conditions = cond;
+    this.fieldNames = fields;
+    this.tableName = table;
+    
+    this.results = executeQuery(getSelectQuery());
+  }
+
+  protected ResultSet executeQuery(String query) throws SQLException {
+    this.statement = connection.prepareStatement(query,
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    return statement.executeQuery();
+  }
+
+  /** Returns the query for selecting the records, 
+   * subclasses can override this for custom behaviour.*/
+  protected String getSelectQuery() {
+      StringBuilder query = new StringBuilder();
+
+    // Default codepath for MySQL, HSQLDB, etc. Relies on LIMIT/OFFSET for splits.
+    if(dbConf.getInputQuery() == null) {
+      query.append("SELECT ");
+  
+      for (int i = 0; i < fieldNames.length; i++) {
+        query.append(fieldNames[i]);
+        if (i != fieldNames.length -1) {
+          query.append(", ");
+        }
+      }
+
+      query.append(" FROM ").append(tableName);
+      query.append(" AS ").append(tableName); //in hsqldb this is necessary
+      if (conditions != null && conditions.length() > 0) {
+        query.append(" WHERE (").append(conditions).append(")");
+      }
+
+      String orderBy = dbConf.getInputOrderBy();
+      if (orderBy != null && orderBy.length() > 0) {
+        query.append(" ORDER BY ").append(orderBy);
+      }
+    } else {
+      //PREBUILT QUERY
+      query.append(dbConf.getInputQuery());
+    }
+        
+    try {
+      query.append(" LIMIT ").append(split.getLength());
+      query.append(" OFFSET ").append(split.getStart());
+    } catch (IOException ex) {
+      // Ignore, will not throw.
+    }		
+
+    return query.toString();
+  }
+
+  /** {@inheritDoc} */
+  public void close() throws IOException {
+    try {
+      if (null != results) {
+        results.close();
+      }
+      if (null != statement) {
+        statement.close();
+      }
+      if (null != connection) {
+        connection.commit();
+      }
+    } catch (SQLException e) {
+      throw new IOException(e.getMessage());
+    }
+  }
+
+  public void initialize(InputSplit split, TaskAttemptContext context) 
+      throws IOException, InterruptedException {
+    //do nothing
+  }
+
+  /** {@inheritDoc} */
+  public LongWritable getCurrentKey() {
+    return key;  
+  }
+
+  /** {@inheritDoc} */
+  public T getCurrentValue() {
+    return value;
+  }
+
+  /**
+   * @deprecated 
+   */
+  @Deprecated
+  public T createValue() {
+    return ReflectionUtils.newInstance(inputClass, conf);
+  }
+
+  /**
+   * @deprecated 
+   */
+  @Deprecated
+  public long getPos() throws IOException {
+    return pos;
+  }
+
+  /**
+   * @deprecated Use {@link #nextKeyValue()}
+   */
+  @Deprecated
+  public boolean next(LongWritable key, T value) throws IOException {
+    this.key = key;
+    this.value = value;
+    return nextKeyValue();
+  }
+
+  /** {@inheritDoc} */
+  public float getProgress() throws IOException {
+    return pos / (float)split.getLength();
+  }
+
+  /** {@inheritDoc} */
+  public boolean nextKeyValue() throws IOException {
+    try {
+      if (key == null) {
+        key = new LongWritable();
+      }
+      if (value == null) {
+        value = createValue();
+      }
+      if (!results.next())
+        return false;
+
+      // Set the key field value as the output key value
+      key.set(pos + split.getStart());
+
+      value.readFields(results);
+
+      pos ++;
+    } catch (SQLException e) {
+      throw new IOException(e.getMessage());
+    }
+    return true;
+  }
+
+  protected DBInputFormat.DBInputSplit getSplit() {
+    return split;
+  }
+
+  protected String [] getFieldNames() {
+    return fieldNames;
+  }
+
+  protected String getTableName() {
+    return tableName;
+  }
+
+  protected String getConditions() {
+    return conditions;
+  }
+
+  protected DBConfiguration getDBConf() {
+    return dbConf;
+  }
+
+  protected Connection getConnection() {
+    return connection;
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java?rev=796357&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java
Tue Jul 21 16:13:02 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A RecordReader that reads records from a MySQL table.
+ */
+public class MySQLDBRecordReader<T extends DBWritable> extends DBRecordReader<T>
{
+
+  public MySQLDBRecordReader(DBInputFormat.DBInputSplit split, 
+      Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
+      String cond, String [] fields, String table) throws SQLException {
+    super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
+  }
+
+  // Execute statements for mysql in unbuffered mode.
+  protected ResultSet executeQuery(String query) throws SQLException {
+    PreparedStatement statement = getConnection().prepareStatement(query,
+      ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time.
+    return statement.executeQuery();
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java?rev=796357&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java
Tue Jul 21 16:13:02 2009
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A RecordReader that reads records from an Oracle SQL table.
+ */
+public class OracleDBRecordReader<T extends DBWritable> extends DBRecordReader<T>
{
+
+  public OracleDBRecordReader(DBInputFormat.DBInputSplit split, 
+      Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
+      String cond, String [] fields, String table) throws SQLException {
+    super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
+  }
+
+  /** Returns the query for selecting the records from an Oracle DB. */
+  protected String getSelectQuery() {
+    StringBuilder query = new StringBuilder();
+    DBConfiguration dbConf = getDBConf();
+    String conditions = getConditions();
+    String tableName = getTableName();
+    String [] fieldNames = getFieldNames();
+
+    // Oracle-specific codepath to use rownum instead of LIMIT/OFFSET.
+    if(dbConf.getInputQuery() == null) {
+      query.append("SELECT ");
+  
+      for (int i = 0; i < fieldNames.length; i++) {
+        query.append(fieldNames[i]);
+        if (i != fieldNames.length -1) {
+          query.append(", ");
+        }
+      }
+  
+      query.append(" FROM ").append(tableName);
+      if (conditions != null && conditions.length() > 0)
+        query.append(" WHERE ").append(conditions);
+      String orderBy = dbConf.getInputOrderBy();
+      if (orderBy != null && orderBy.length() > 0) {
+        query.append(" ORDER BY ").append(orderBy);
+      }
+    } else {
+      //PREBUILT QUERY
+      query.append(dbConf.getInputQuery());
+    }
+        
+    try {
+      DBInputFormat.DBInputSplit split = getSplit();
+      if (split.getLength() > 0 && split.getStart() > 0){
+        String querystring = query.toString();
+
+        query = new StringBuilder();
+        query.append("SELECT * FROM (SELECT a.*,ROWNUM dbif_rno FROM ( ");
+        query.append(querystring);
+        query.append(" ) a WHERE rownum <= ").append(split.getStart());
+        query.append(" + ").append(split.getLength());
+        query.append(" ) WHERE dbif_rno >= ").append(split.getStart());
+      }
+    } catch (IOException ex) {
+      // ignore, will not throw.
+    }		      
+
+    return query.toString();
+  }
+}



Mime
View raw message