hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r783981 - in /hadoop/core/trunk: ./ src/examples/org/apache/hadoop/examples/ src/mapred/org/apache/hadoop/mapred/lib/db/ src/mapred/org/apache/hadoop/mapreduce/lib/db/ src/test/ src/test/mapred/org/apache/hadoop/mapred/lib/db/ src/test/mapr...
Date Fri, 12 Jun 2009 05:15:26 GMT
Author: sharad
Date: Fri Jun 12 05:15:25 2009
New Revision: 783981

URL: http://svn.apache.org/viewvc?rev=783981&view=rev
Log:
HADOOP-5690. Change org.apache.hadoop.examples.DBCountPageView to use new mapreduce api. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBWritable.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/package.html
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestDBJob.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestDBOutputFormat.java
Removed:
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/db/TestDBJob.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java
    hadoop/core/trunk/src/test/findbugsExcludeFile.xml

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=783981&r1=783980&r2=783981&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jun 12 05:15:25 2009
@@ -56,6 +56,9 @@
     HADOOP-4359. Support for data access authorization checking on Datanodes.
     (Kan Zhang via rangadi)
 
+    HADOOP-5690. Change org.apache.hadoop.examples.DBCountPageView to use 
+    new mapreduce api. (Amareshwari Sriramadasu via sharad)
+
   NEW FEATURES
 
     HADOOP-4268. Change fsck to use ClientProtocol methods so that the

Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java?rev=783981&r1=783980&r2=783981&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java Fri Jun 12 05:15:25 2009
@@ -32,23 +32,20 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.lib.LongSumReducer;
-import org.apache.hadoop.mapred.lib.db.DBConfiguration;
-import org.apache.hadoop.mapred.lib.db.DBInputFormat;
-import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
-import org.apache.hadoop.mapred.lib.db.DBWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -79,7 +76,8 @@
   private static final String[] AccessFieldNames = {"url", "referrer", "time"};
   private static final String[] PageviewFieldNames = {"url", "pageview"};
   
-  private static final String DB_URL = "jdbc:hsqldb:hsql://localhost/URLAccess";
+  private static final String DB_URL = 
+    "jdbc:hsqldb:hsql://localhost/URLAccess";
   private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver";
   
   private Server server;
@@ -87,7 +85,7 @@
   private void startHsqldbServer() {
     server = new Server();
     server.setDatabasePath(0, 
-        System.getProperty("test.build.data",".") + "/URLAccess");
+        System.getProperty("test.build.data", "/tmp") + "/URLAccess");
     server.setDatabaseName(0, "URLAccess");
     server.start();
   }
@@ -193,10 +191,11 @@
 
 
       //Pages in the site :
-      String[] pages = {"/a", "/b", "/c", "/d", "/e", "/f", "/g", "/h", "/i", "/j"};
+      String[] pages = {"/a", "/b", "/c", "/d", "/e", 
+                        "/f", "/g", "/h", "/i", "/j"};
       //linkMatrix[i] is the array of pages(indexes) that page_i links to.  
-      int[][] linkMatrix = {{1,5,7}, {0,7,4,6,}, {0,1,7,8}, {0,2,4,6,7,9}, {0,1},
-          {0,3,5,9}, {0}, {0,1,3}, {0,2,6}, {0,2,6}};
+      int[][] linkMatrix = {{1,5,7}, {0,7,4,6,}, {0,1,7,8}, 
+        {0,2,4,6,7,9}, {0,1}, {0,3,5,9}, {0}, {0,1,3}, {0,2,6}, {0,2,6}};
 
       //a mini model of user browsing a la pagerank
       int currentPage = random.nextInt(pages.length); 
@@ -211,7 +210,8 @@
 
         int action = random.nextInt(PROBABILITY_PRECISION);
 
-        //go to a new page with probability NEW_PAGE_PROBABILITY / PROBABILITY_PRECISION
+        // go to a new page with probability 
+        // NEW_PAGE_PROBABILITY / PROBABILITY_PRECISION
         if(action < NEW_PAGE_PROBABILITY) { 
           currentPage = random.nextInt(pages.length); // a random page
           referrer = null;
@@ -337,17 +337,15 @@
    * Mapper extracts URLs from the AccessRecord (tuples from db), 
    * and emits a &lt;url,1&gt; pair for each access record. 
    */
-  static class PageviewMapper extends MapReduceBase 
-    implements Mapper<LongWritable, AccessRecord, Text, LongWritable> {
+  static class PageviewMapper extends 
+      Mapper<LongWritable, AccessRecord, Text, LongWritable> {
     
     LongWritable ONE = new LongWritable(1L);
     @Override
-    public void map(LongWritable key, AccessRecord value,
-        OutputCollector<Text, LongWritable> output, Reporter reporter)
-        throws IOException {
-      
+    public void map(LongWritable key, AccessRecord value, Context context)
+        throws IOException, InterruptedException {
       Text oKey = new Text(value.url);
-      output.collect(oKey, ONE);
+      context.write(oKey, ONE);
     }
   }
   
@@ -355,20 +353,19 @@
    * Reducer sums up the pageviews and emits a PageviewRecord, 
    * which will correspond to one tuple in the db.
    */
-  static class PageviewReducer extends MapReduceBase 
-    implements Reducer<Text, LongWritable, PageviewRecord, NullWritable> {
+  static class PageviewReducer extends 
+      Reducer<Text, LongWritable, PageviewRecord, NullWritable> {
     
     NullWritable n = NullWritable.get();
     @Override
-    public void reduce(Text key, Iterator<LongWritable> values,
-        OutputCollector<PageviewRecord, NullWritable> output, Reporter reporter)
-        throws IOException {
+    public void reduce(Text key, Iterable<LongWritable> values, 
+        Context context) throws IOException, InterruptedException {
       
       long sum = 0L;
-      while(values.hasNext()) {
-        sum += values.next().get();
+      for(LongWritable value: values) {
+        sum += value.get();
       }
-      output.collect(new PageviewRecord(key.toString(), sum), n);
+      context.write(new PageviewRecord(key.toString(), sum), n);
     }
   }
   
@@ -385,17 +382,18 @@
     }
     
     initialize(driverClassName, url);
+    Configuration conf = getConf();
+
+    DBConfiguration.configureDB(conf, driverClassName, url);
 
-    JobConf job = new JobConf(getConf(), DBCountPageView.class);
+    Job job = new Job(conf);
         
     job.setJobName("Count Pageviews of URLs");
-
+    job.setJarByClass(DBCountPageView.class);
     job.setMapperClass(PageviewMapper.class);
     job.setCombinerClass(LongSumReducer.class);
     job.setReducerClass(PageviewReducer.class);
 
-    DBConfiguration.configureDB(job, driverClassName, url);
-    
     DBInputFormat.setInput(job, AccessRecord.class, "Access"
         , null, "url", AccessFieldNames);
 
@@ -406,10 +404,9 @@
 
     job.setOutputKeyClass(PageviewRecord.class);
     job.setOutputValueClass(NullWritable.class);
-
+    int ret;
     try {
-      JobClient.runJob(job);
-      
+      ret = job.waitForCompletion(true) ? 0 : 1;
       boolean correct = verify();
       if(!correct) {
         throw new RuntimeException("Evaluation was not correct!");
@@ -417,7 +414,7 @@
     } finally {
       shutdown();    
     }
-    return 0;
+    return ret;
   }
 
   public static void main(String[] args) throws Exception {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java?rev=783981&r1=783980&r2=783981&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java Fri Jun 12 05:15:25 2009
@@ -18,70 +18,71 @@
 
 package org.apache.hadoop.mapred.lib.db;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.lib.db.DBInputFormat.NullDBWritable;
 
 /**
- * A container for configuration property names for jobs with DB input/output. 
- * <br>
- * The job can be configured using the static methods in this class, 
- * {@link DBInputFormat}, and {@link DBOutputFormat}. 
- * <p> 
- * Alternatively, the properties can be set in the configuration with proper
- * values. 
- *   
- * @see DBConfiguration#configureDB(JobConf, String, String, String, String)
- * @see DBInputFormat#setInput(JobConf, Class, String, String)
- * @see DBInputFormat#setInput(JobConf, Class, String, String, String, String...)
- * @see DBOutputFormat#setOutput(JobConf, String, String...)
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.db.DBConfiguration} instead 
  */
-public class DBConfiguration {
-
+@Deprecated
+public class DBConfiguration extends 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration {
   /** The JDBC Driver class name */
-  public static final String DRIVER_CLASS_PROPERTY = "mapred.jdbc.driver.class";
+  public static final String DRIVER_CLASS_PROPERTY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.DRIVER_CLASS_PROPERTY;
   
   /** JDBC Database access URL */
-  public static final String URL_PROPERTY = "mapred.jdbc.url";
+  public static final String URL_PROPERTY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.URL_PROPERTY;
 
   /** User name to access the database */
-  public static final String USERNAME_PROPERTY = "mapred.jdbc.username";
+  public static final String USERNAME_PROPERTY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.USERNAME_PROPERTY;
   
   /** Password to access the database */
-  public static final String PASSWORD_PROPERTY = "mapred.jdbc.password";
+  public static final String PASSWORD_PROPERTY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.PASSWORD_PROPERTY;
 
   /** Input table name */
-  public static final String INPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.input.table.name";
+  public static final String INPUT_TABLE_NAME_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.INPUT_TABLE_NAME_PROPERTY;
 
   /** Field names in the Input table */
-  public static final String INPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.input.field.names";
+  public static final String INPUT_FIELD_NAMES_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.INPUT_FIELD_NAMES_PROPERTY;
 
   /** WHERE clause in the input SELECT statement */
-  public static final String INPUT_CONDITIONS_PROPERTY = "mapred.jdbc.input.conditions";
+  public static final String INPUT_CONDITIONS_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.INPUT_CONDITIONS_PROPERTY;
   
   /** ORDER BY clause in the input SELECT statement */
-  public static final String INPUT_ORDER_BY_PROPERTY = "mapred.jdbc.input.orderby";
+  public static final String INPUT_ORDER_BY_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.INPUT_ORDER_BY_PROPERTY;
   
   /** Whole input query, exluding LIMIT...OFFSET */
-  public static final String INPUT_QUERY = "mapred.jdbc.input.query";
+  public static final String INPUT_QUERY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.INPUT_QUERY;
   
   /** Input query to get the count of records */
-  public static final String INPUT_COUNT_QUERY = "mapred.jdbc.input.count.query";
+  public static final String INPUT_COUNT_QUERY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.INPUT_COUNT_QUERY;
   
   /** Class name implementing DBWritable which will hold input tuples */
-  public static final String INPUT_CLASS_PROPERTY = "mapred.jdbc.input.class";
+  public static final String INPUT_CLASS_PROPERTY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.INPUT_CLASS_PROPERTY;
 
   /** Output table name */
-  public static final String OUTPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.output.table.name";
+  public static final String OUTPUT_TABLE_NAME_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY;
 
   /** Field names in the Output table */
-  public static final String OUTPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.field.names";  
+  public static final String OUTPUT_FIELD_NAMES_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY;  
 
   /** Number of fields in the Output table */
-  public static final String OUTPUT_FIELD_COUNT_PROPERTY = "mapred.jdbc.output.field.count";  
+  public static final String OUTPUT_FIELD_COUNT_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY;
+
   
   /**
    * Sets the DB access related fields in the JobConf.  
@@ -112,115 +113,8 @@
     configureDB(job, driverClass, dbUrl, null, null);
   }
 
-  private JobConf job;
-
   DBConfiguration(JobConf job) {
-    this.job = job;
-  }
-
-  /** Returns a connection object o the DB 
-   * @throws ClassNotFoundException 
-   * @throws SQLException */
-  Connection getConnection() throws ClassNotFoundException, SQLException{
-
-    Class.forName(job.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
-
-    if(job.get(DBConfiguration.USERNAME_PROPERTY) == null) {
-      return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY));
-    } else {
-      return DriverManager.getConnection(
-          job.get(DBConfiguration.URL_PROPERTY), 
-          job.get(DBConfiguration.USERNAME_PROPERTY), 
-          job.get(DBConfiguration.PASSWORD_PROPERTY));
-    }
-  }
-
-  String getInputTableName() {
-    return job.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY);
-  }
-
-  void setInputTableName(String tableName) {
-    job.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
-  }
-
-  String[] getInputFieldNames() {
-    return job.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
-  }
-
-  void setInputFieldNames(String... fieldNames) {
-    job.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
-  }
-
-  String getInputConditions() {
-    return job.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY);
-  }
-
-  void setInputConditions(String conditions) {
-    if (conditions != null && conditions.length() > 0)
-      job.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions);
-  }
-
-  String getInputOrderBy() {
-    return job.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY);
-  }
-  
-  void setInputOrderBy(String orderby) {
-    if(orderby != null && orderby.length() >0) {
-      job.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby);
-    }
-  }
-  
-  String getInputQuery() {
-    return job.get(DBConfiguration.INPUT_QUERY);
-  }
-  
-  void setInputQuery(String query) {
-    if(query != null && query.length() >0) {
-      job.set(DBConfiguration.INPUT_QUERY, query);
-    }
-  }
-  
-  String getInputCountQuery() {
-    return job.get(DBConfiguration.INPUT_COUNT_QUERY);
-  }
-  
-  void setInputCountQuery(String query) {
-    if(query != null && query.length() >0) {
-      job.set(DBConfiguration.INPUT_COUNT_QUERY, query);
-    }
-  }
-  
-  
-  Class<?> getInputClass() {
-    return job.getClass(DBConfiguration.INPUT_CLASS_PROPERTY, NullDBWritable.class);
-  }
-
-  void setInputClass(Class<? extends DBWritable> inputClass) {
-    job.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class);
-  }
-
-  String getOutputTableName() {
-    return job.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY);
-  }
-
-  void setOutputTableName(String tableName) {
-    job.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
-  }
-
-  String[] getOutputFieldNames() {
-    return job.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY);
-  }
-
-  void setOutputFieldNames(String... fieldNames) {
-    job.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
-  }
-
-  void setOutputFieldCount(int fieldCount) {
-    job.setInt(DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY, fieldCount);
-  }
-  
-  int getOutputFieldCount() {
-    return job.getInt(OUTPUT_FIELD_COUNT_PROPERTY, 0);
+    super(job);
   }
   
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java?rev=783981&r1=783980&r2=783981&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java Fri Jun 12 05:15:25 2009
@@ -18,14 +18,9 @@
 
 package org.apache.hadoop.mapred.lib.db;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
+import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
@@ -35,100 +30,31 @@
 import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.Job;
 
 /**
- * A InputFormat that reads input data from an SQL table.
- * <p>
- * DBInputFormat emits LongWritables containing the record number as 
- * key and DBWritables as value. 
- * 
- * The SQL query, and input class can be using one of the two 
- * setInput methods.
+ * @deprecated 
+ * Use {@link org.apache.hadoop.mapreduce.lib.db.DBInputFormat} instead.
  */
+@Deprecated
 public class DBInputFormat<T  extends DBWritable>
-  implements InputFormat<LongWritable, T>, JobConfigurable {
+    extends org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T> 
+    implements InputFormat<LongWritable, T>, JobConfigurable {
   /**
    * A RecordReader that reads records from a SQL table.
    * Emits LongWritables containing the record number as 
    * key and DBWritables as value.  
    */
-  protected class DBRecordReader implements
-  RecordReader<LongWritable, T> {
-    private ResultSet results;
-
-    private Statement statement;
-
-    private Class<T> inputClass;
-
-    private JobConf job;
-
-    private DBInputSplit split;
-
-    private long pos = 0;
-
+  protected class DBRecordReader extends
+      org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T>.DBRecordReader
+      implements RecordReader<LongWritable, T> {
     /**
      * @param split The InputSplit to read data for
      * @throws SQLException 
      */
-    protected DBRecordReader(DBInputSplit split, Class<T> inputClass, JobConf job) throws SQLException {
-      this.inputClass = inputClass;
-      this.split = split;
-      this.job = job;
-      
-      statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-
-      //statement.setFetchSize(Integer.MIN_VALUE);
-      results = statement.executeQuery(getSelectQuery());
-    }
-
-    /** Returns the query for selecting the records, 
-     * subclasses can override this for custom behaviour.*/
-    protected String getSelectQuery() {
-      StringBuilder query = new StringBuilder();
-      
-      if(dbConf.getInputQuery() == null) {
-        query.append("SELECT ");
-
-        for (int i = 0; i < fieldNames.length; i++) {
-          query.append(fieldNames[i]);
-          if(i != fieldNames.length -1) {
-            query.append(", ");
-          }
-        }
-
-        query.append(" FROM ").append(tableName);
-        query.append(" AS ").append(tableName); //in hsqldb this is necessary
-        if (conditions != null && conditions.length() > 0)
-          query.append(" WHERE (").append(conditions).append(")");
-        String orderBy = dbConf.getInputOrderBy();
-        if(orderBy != null && orderBy.length() > 0) {
-          query.append(" ORDER BY ").append(orderBy);
-        }
-      }
-      else {
-        query.append(dbConf.getInputQuery());
-      }
-
-      try {
-        query.append(" LIMIT ").append(split.getLength());
-        query.append(" OFFSET ").append(split.getStart());
-      }
-      catch (IOException ex) {
-        //ignore, will not throw
-      }
-      return query.toString();
-    }
-
-    /** {@inheritDoc} */
-    public void close() throws IOException {
-      try {
-        connection.commit();
-        results.close();
-        statement.close();
-      } catch (SQLException e) {
-        throw new IOException(e.getMessage());
-      }
+    protected DBRecordReader(DBInputSplit split, Class<T> inputClass, 
+      JobConf job) throws SQLException {
+     super(split, inputClass, job);
     }
 
     /** {@inheritDoc} */
@@ -138,59 +64,32 @@
 
     /** {@inheritDoc} */
     public T createValue() {
-      return ReflectionUtils.newInstance(inputClass, job);
+      return super.createValue();
     }
 
-    /** {@inheritDoc} */
     public long getPos() throws IOException {
-      return pos;
-    }
-
-    /** {@inheritDoc} */
-    public float getProgress() throws IOException {
-      return pos / (float)split.getLength();
+      return super.getPos();
     }
 
     /** {@inheritDoc} */
     public boolean next(LongWritable key, T value) throws IOException {
-      try {
-        if (!results.next())
-          return false;
-
-        // Set the key field value as the output key value
-        key.set(pos + split.getStart());
-
-        value.readFields(results);
-
-        pos ++;
-      } catch (SQLException e) {
-        throw new IOException(e.getMessage());
-      }
-      return true;
+      return super.next(key, value);
     }
   }
 
   /**
    * A Class that does nothing, implementing DBWritable
    */
-  public static class NullDBWritable implements DBWritable, Writable {
-    @Override
-    public void readFields(DataInput in) throws IOException { }
-    @Override
-    public void readFields(ResultSet arg0) throws SQLException { }
-    @Override
-    public void write(DataOutput out) throws IOException { }
-    @Override
-    public void write(PreparedStatement arg0) throws SQLException { }
+  public static class NullDBWritable extends 
+      org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable 
+      implements DBWritable, Writable {
   }
   /**
    * A InputSplit that spans a set of rows
    */
-  protected static class DBInputSplit implements InputSplit {
-
-    private long end = 0;
-    private long start = 0;
-
+  protected static class DBInputSplit extends 
+      org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit 
+      implements InputSplit {
     /**
      * Default Constructor
      */
@@ -203,77 +102,13 @@
      * @param end the index of the last row to select
      */
     public DBInputSplit(long start, long end) {
-      this.start = start;
-      this.end = end;
-    }
-
-    /** {@inheritDoc} */
-    public String[] getLocations() throws IOException {
-      // TODO Add a layer to enable SQL "sharding" and support locality
-      return new String[] {};
-    }
-
-    /**
-     * @return The index of the first row to select
-     */
-    public long getStart() {
-      return start;
-    }
-
-    /**
-     * @return The index of the last row to select
-     */
-    public long getEnd() {
-      return end;
-    }
-
-    /**
-     * @return The total row count in this split
-     */
-    public long getLength() throws IOException {
-      return end - start;
-    }
-
-    /** {@inheritDoc} */
-    public void readFields(DataInput input) throws IOException {
-      start = input.readLong();
-      end = input.readLong();
-    }
-
-    /** {@inheritDoc} */
-    public void write(DataOutput output) throws IOException {
-      output.writeLong(start);
-      output.writeLong(end);
+      super(start, end);
     }
   }
 
-  private String conditions;
-
-  private Connection connection;
-
-  private String tableName;
-
-  private String[] fieldNames;
-
-  private DBConfiguration dbConf;
-
   /** {@inheritDoc} */
   public void configure(JobConf job) {
-
-    dbConf = new DBConfiguration(job);
-
-    try {
-      this.connection = dbConf.getConnection();
-      this.connection.setAutoCommit(false);
-      connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
-    }
-    catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-
-    tableName = dbConf.getInputTableName();
-    fieldNames = dbConf.getInputFieldNames();
-    conditions = dbConf.getInputConditions();
+    super.setConf(job);
   }
 
   /** {@inheritDoc} */
@@ -281,7 +116,7 @@
   public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
       JobConf job, Reporter reporter) throws IOException {
 
-    Class inputClass = dbConf.getInputClass();
+    Class inputClass = super.getDBConf().getInputClass();
     try {
       return new DBRecordReader((DBInputSplit) split, inputClass, job);
     }
@@ -292,63 +127,16 @@
 
   /** {@inheritDoc} */
   public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
-
-	ResultSet results = null;  
-	Statement statement = null;
-    try {
-      statement = connection.createStatement();
-
-      results = statement.executeQuery(getCountQuery());
-      results.next();
-
-      long count = results.getLong(1);
-      long chunkSize = (count / chunks);
-
-      results.close();
-      statement.close();
-
-      InputSplit[] splits = new InputSplit[chunks];
-
-      // Split the rows into n-number of chunks and adjust the last chunk
-      // accordingly
-      for (int i = 0; i < chunks; i++) {
-        DBInputSplit split;
-
-        if ((i + 1) == chunks)
-          split = new DBInputSplit(i * chunkSize, count);
-        else
-          split = new DBInputSplit(i * chunkSize, (i * chunkSize)
-              + chunkSize);
-
-        splits[i] = split;
-      }
-
-      return splits;
-    } catch (SQLException e) {
-      try {
-        if (results != null) { results.close(); }
-      } catch (SQLException e1) {}
-      try {
-        if (statement != null) { statement.close(); }
-      } catch (SQLException e1) {}
-      throw new IOException(e.getMessage());
+    List<org.apache.hadoop.mapreduce.InputSplit> newSplits = 
+      super.getSplits(new Job(job));
+    InputSplit[] ret = new InputSplit[newSplits.size()];
+    int i = 0;
+    for (org.apache.hadoop.mapreduce.InputSplit s : newSplits) {
+      org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split = 
+    	(org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit)s;
+      ret[i++] = new DBInputSplit(split.getStart(), split.getEnd());
     }
-  }
-
-  /** Returns the query for getting the total number of rows, 
-   * subclasses can override this for custom behaviour.*/
-  protected String getCountQuery() {
-    
-    if(dbConf.getInputCountQuery() != null) {
-      return dbConf.getInputCountQuery();
-    }
-    
-    StringBuilder query = new StringBuilder();
-    query.append("SELECT COUNT(*) FROM " + tableName);
-
-    if (conditions != null && conditions.length() > 0)
-      query.append(" WHERE " + conditions);
-    return query.toString();
+    return ret;
   }
 
   /**

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java?rev=783981&r1=783980&r2=783981&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java Fri Jun 12 05:15:25 2009
@@ -23,120 +23,41 @@
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.StringUtils;
 
 /**
- * A OutputFormat that sends the reduce output to a SQL table.
- * <p> 
- * {@link DBOutputFormat} accepts &lt;key,value&gt; pairs, where 
- * key has a type extending DBWritable. Returned {@link RecordWriter} 
- * writes <b>only the key</b> to the database with a batch SQL query.  
- * 
+ * @deprecated Use org.apache.hadoop.mapreduce.lib.db.DBOutputFormat instead
  */
+@Deprecated
 public class DBOutputFormat<K  extends DBWritable, V> 
-implements OutputFormat<K,V> {
-
-  private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
+    extends org.apache.hadoop.mapreduce.lib.db.DBOutputFormat<K, V>
+    implements OutputFormat<K, V> {
 
   /**
    * A RecordWriter that writes the reduce output to a SQL table
    */
-  protected class DBRecordWriter 
-  implements RecordWriter<K, V> {
-
-    private Connection connection;
-    private PreparedStatement statement;
-
-    protected DBRecordWriter(Connection connection
-        , PreparedStatement statement) throws SQLException {
-      this.connection = connection;
-      this.statement = statement;
-      this.connection.setAutoCommit(false);
+  protected class DBRecordWriter extends 
+      org.apache.hadoop.mapreduce.lib.db.DBOutputFormat<K, V>.DBRecordWriter
+      implements RecordWriter<K, V> {
+
+    protected DBRecordWriter(Connection connection, 
+      PreparedStatement statement) throws SQLException {
+      super(connection, statement);
     }
 
     /** {@inheritDoc} */
     public void close(Reporter reporter) throws IOException {
-      try {
-        statement.executeBatch();
-        connection.commit();
-      } catch (SQLException e) {
-        try {
-          connection.rollback();
-        }
-        catch (SQLException ex) {
-          LOG.warn(StringUtils.stringifyException(ex));
-        }
-        throw new IOException(e.getMessage());
-      } finally {
-        try {
-          statement.close();
-          connection.close();
-        }
-        catch (SQLException ex) {
-          throw new IOException(ex.getMessage());
-        }
-      }
-    }
-
-    /** {@inheritDoc} */
-    public void write(K key, V value) throws IOException {
-      try {
-        key.write(statement);
-        statement.addBatch();
-      } catch (SQLException e) {
-        e.printStackTrace();
-      }
+      super.close(null);
     }
   }
 
-  /**
-   * Constructs the query used as the prepared statement to insert data.
-   * 
-   * @param table
-   *          the table to insert into
-   * @param fieldNames
-   *          the fields to insert into. If field names are unknown, supply an
-   *          array of nulls.
-   */
-  protected String constructQuery(String table, String[] fieldNames) {
-    if(fieldNames == null) {
-      throw new IllegalArgumentException("Field names may not be null");
-    }
-
-    StringBuilder query = new StringBuilder();
-    query.append("INSERT INTO ").append(table);
-
-    if (fieldNames.length > 0 && fieldNames[0] != null) {
-      query.append(" (");
-      for (int i = 0; i < fieldNames.length; i++) {
-        query.append(fieldNames[i]);
-        if (i != fieldNames.length - 1) {
-          query.append(",");
-        }
-      }
-      query.append(")");
-    }
-    query.append(" VALUES (");
-
-    for (int i = 0; i < fieldNames.length; i++) {
-      query.append("?");
-      if(i != fieldNames.length - 1) {
-        query.append(",");
-      }
-    }
-    query.append(");");
-
-    return query.toString();
-  }
-
   /** {@inheritDoc} */
   public void checkOutputSpecs(FileSystem filesystem, JobConf job)
   throws IOException {
@@ -146,24 +67,15 @@
   /** {@inheritDoc} */
   public RecordWriter<K, V> getRecordWriter(FileSystem filesystem,
       JobConf job, String name, Progressable progress) throws IOException {
-
-    DBConfiguration dbConf = new DBConfiguration(job);
-    String tableName = dbConf.getOutputTableName();
-    String[] fieldNames = dbConf.getOutputFieldNames();
-    
-    if(fieldNames == null) {
-      fieldNames = new String[dbConf.getOutputFieldCount()];
-    }
-    
+    org.apache.hadoop.mapreduce.RecordWriter<K, V> w = super.getRecordWriter(
+      new TaskAttemptContext(job, 
+            TaskAttemptID.forName(job.get("mapred.task.id"))));
+    org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter writer = 
+     (org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter) w;
     try {
-      Connection connection = dbConf.getConnection();
-      PreparedStatement statement = null;
-  
-      statement = connection.prepareStatement(constructQuery(tableName, fieldNames));
-      return new DBRecordWriter(connection, statement);
-    }
-    catch (Exception ex) {
-      throw new IOException(ex.getMessage());
+      return new DBRecordWriter(writer.getConnection(), writer.getStatement());
+    } catch(SQLException se) {
+      throw new IOException(se);
     }
   }
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java?rev=783981&r1=783980&r2=783981&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java Fri Jun 12 05:15:25 2009
@@ -1,75 +1,11 @@
 package org.apache.hadoop.mapred.lib.db;
 
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import org.apache.hadoop.io.Writable;
-
 /**
- * Objects that are read from/written to a database should implement
- * <code>DBWritable</code>. DBWritable, is similar to {@link Writable} 
- * except that the {@link #write(PreparedStatement)} method takes a 
- * {@link PreparedStatement}, and {@link #readFields(ResultSet)} 
- * takes a {@link ResultSet}. 
- * <p>
- * Implementations are responsible for writing the fields of the object 
- * to PreparedStatement, and reading the fields of the object from the 
- * ResultSet. 
- * 
- * <p>Example:</p>
- * If we have the following table in the database :
- * <pre>
- * CREATE TABLE MyTable (
- *   counter        INTEGER NOT NULL,
- *   timestamp      BIGINT  NOT NULL,
- * );
- * </pre>
- * then we can read/write the tuples from/to the table with :
- * <p><pre>
- * public class MyWritable implements Writable, DBWritable {
- *   // Some data     
- *   private int counter;
- *   private long timestamp;
- *       
- *   //Writable#write() implementation
- *   public void write(DataOutput out) throws IOException {
- *     out.writeInt(counter);
- *     out.writeLong(timestamp);
- *   }
- *       
- *   //Writable#readFields() implementation
- *   public void readFields(DataInput in) throws IOException {
- *     counter = in.readInt();
- *     timestamp = in.readLong();
- *   }
- *       
- *   public void write(PreparedStatement statement) throws SQLException {
- *     statement.setInt(1, counter);
- *     statement.setLong(2, timestamp);
- *   }
- *       
- *   public void readFields(ResultSet resultSet) throws SQLException {
- *     counter = resultSet.getInt(1);
- *     timestamp = resultSet.getLong(2);
- *   } 
- * }
- * </pre></p>
+ * @deprecated 
+ * Use {@link org.apache.hadoop.mapreduce.lib.db.DBWritable} instead
  */
-public interface DBWritable {
-
-  /**
-   * Sets the fields of the object in the {@link PreparedStatement}.
-   * @param statement the statement that the fields are put into.
-   * @throws SQLException
-   */
-	public void write(PreparedStatement statement) throws SQLException;
-	
-	/**
-	 * Reads the fields of the object from the {@link ResultSet}. 
-	 * @param resultSet the {@link ResultSet} to get the fields from.
-	 * @throws SQLException
-	 */
-	public void readFields(ResultSet resultSet) throws SQLException ; 
+@Deprecated
+public interface DBWritable 
+    extends org.apache.hadoop.mapreduce.lib.db.DBWritable {
 	
 }

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java?rev=783981&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java Fri Jun 12 05:15:25 2009
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
+
+/**
+ * A container for configuration property names for jobs with DB input/output.
+ *  
+ * The job can be configured using the static methods in this class, 
+ * {@link DBInputFormat}, and {@link DBOutputFormat}. 
+ * Alternatively, the properties can be set in the configuration with proper
+ * values. 
+ *   
+ * @see DBConfiguration#configureDB(Configuration, String, String, String, String)
+ * @see DBInputFormat#setInput(Job, Class, String, String)
+ * @see DBInputFormat#setInput(Job, Class, String, String, String, String...)
+ * @see DBOutputFormat#setOutput(Job, String, String...)
+ */
+public class DBConfiguration {
+
+  /** The JDBC Driver class name */
+  public static final String DRIVER_CLASS_PROPERTY = 
+      "mapred.jdbc.driver.class";
+  
+  /** JDBC Database access URL */
+  public static final String URL_PROPERTY = "mapred.jdbc.url";
+
+  /** User name to access the database */
+  public static final String USERNAME_PROPERTY = "mapred.jdbc.username";
+  
+  /** Password to access the database */
+  public static final String PASSWORD_PROPERTY = "mapred.jdbc.password";
+
+  /** Input table name */
+  public static final String INPUT_TABLE_NAME_PROPERTY = 
+      "mapred.jdbc.input.table.name";
+
+  /** Field names in the Input table */
+  public static final String INPUT_FIELD_NAMES_PROPERTY = 
+      "mapred.jdbc.input.field.names";
+
+  /** WHERE clause in the input SELECT statement */
+  public static final String INPUT_CONDITIONS_PROPERTY = 
+      "mapred.jdbc.input.conditions";
+  
+  /** ORDER BY clause in the input SELECT statement */
+  public static final String INPUT_ORDER_BY_PROPERTY = 
+      "mapred.jdbc.input.orderby";
+  
+  /** Whole input query, exluding LIMIT...OFFSET */
+  public static final String INPUT_QUERY = "mapred.jdbc.input.query";
+  
+  /** Input query to get the count of records */
+  public static final String INPUT_COUNT_QUERY = 
+      "mapred.jdbc.input.count.query";
+  
+  /** Class name implementing DBWritable which will hold input tuples */
+  public static final String INPUT_CLASS_PROPERTY = "mapred.jdbc.input.class";
+
+  /** Output table name */
+  public static final String OUTPUT_TABLE_NAME_PROPERTY = 
+      "mapred.jdbc.output.table.name";
+
+  /** Field names in the Output table */
+  public static final String OUTPUT_FIELD_NAMES_PROPERTY = 
+      "mapred.jdbc.output.field.names";  
+
+  /** Number of fields in the Output table */
+  public static final String OUTPUT_FIELD_COUNT_PROPERTY = 
+      "mapred.jdbc.output.field.count";  
+  
+  /**
+   * Sets the DB access related fields in the {@link Configuration}.  
+   * @param conf the configuration
+   * @param driverClass JDBC Driver class name
+   * @param dbUrl JDBC DB access URL. 
+   * @param userName DB access username 
+   * @param passwd DB access passwd
+   */
+  public static void configureDB(Configuration conf, String driverClass, 
+      String dbUrl, String userName, String passwd) {
+
+    conf.set(DRIVER_CLASS_PROPERTY, driverClass);
+    conf.set(URL_PROPERTY, dbUrl);
+    if (userName != null) {
+      conf.set(USERNAME_PROPERTY, userName);
+    }
+    if (passwd != null) {
+      conf.set(PASSWORD_PROPERTY, passwd);
+    }
+  }
+
+  /**
+   * Sets the DB access related fields in the JobConf.  
+   * @param job the job
+   * @param driverClass JDBC Driver class name
+   * @param dbUrl JDBC DB access URL. 
+   */
+  public static void configureDB(Configuration job, String driverClass,
+      String dbUrl) {
+    configureDB(job, driverClass, dbUrl, null, null);
+  }
+
+  private Configuration conf;
+
+  public DBConfiguration(Configuration job) {
+    this.conf = job;
+  }
+
+  /** Returns a connection object o the DB 
+   * @throws ClassNotFoundException 
+   * @throws SQLException */
+  public Connection getConnection() 
+      throws ClassNotFoundException, SQLException {
+
+    Class.forName(conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
+
+    if(conf.get(DBConfiguration.USERNAME_PROPERTY) == null) {
+      return DriverManager.getConnection(
+               conf.get(DBConfiguration.URL_PROPERTY));
+    } else {
+      return DriverManager.getConnection(
+          conf.get(DBConfiguration.URL_PROPERTY), 
+          conf.get(DBConfiguration.USERNAME_PROPERTY), 
+          conf.get(DBConfiguration.PASSWORD_PROPERTY));
+    }
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  public String getInputTableName() {
+    return conf.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY);
+  }
+
+  public void setInputTableName(String tableName) {
+    conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
+  }
+
+  public String[] getInputFieldNames() {
+    return conf.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
+  }
+
+  public void setInputFieldNames(String... fieldNames) {
+    conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
+  }
+
+  public String getInputConditions() {
+    return conf.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY);
+  }
+
+  public void setInputConditions(String conditions) {
+    if (conditions != null && conditions.length() > 0)
+      conf.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions);
+  }
+
+  public String getInputOrderBy() {
+    return conf.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY);
+  }
+  
+  public void setInputOrderBy(String orderby) {
+    if(orderby != null && orderby.length() >0) {
+      conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby);
+    }
+  }
+  
+  public String getInputQuery() {
+    return conf.get(DBConfiguration.INPUT_QUERY);
+  }
+  
+  public void setInputQuery(String query) {
+    if(query != null && query.length() >0) {
+      conf.set(DBConfiguration.INPUT_QUERY, query);
+    }
+  }
+  
+  public String getInputCountQuery() {
+    return conf.get(DBConfiguration.INPUT_COUNT_QUERY);
+  }
+  
+  public void setInputCountQuery(String query) {
+    if(query != null && query.length() > 0) {
+      conf.set(DBConfiguration.INPUT_COUNT_QUERY, query);
+    }
+  }
+  
+  public Class<?> getInputClass() {
+    return conf.getClass(DBConfiguration.INPUT_CLASS_PROPERTY,
+                         NullDBWritable.class);
+  }
+
+  public void setInputClass(Class<? extends DBWritable> inputClass) {
+    conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass,
+                  DBWritable.class);
+  }
+
+  public String getOutputTableName() {
+    return conf.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY);
+  }
+
+  public void setOutputTableName(String tableName) {
+    conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
+  }
+
+  public String[] getOutputFieldNames() {
+    return conf.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY);
+  }
+
+  public void setOutputFieldNames(String... fieldNames) {
+    conf.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
+  }
+
+  public void setOutputFieldCount(int fieldCount) {
+    conf.setInt(DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY, fieldCount);
+  }
+  
+  public int getOutputFieldCount() {
+    return conf.getInt(OUTPUT_FIELD_COUNT_PROPERTY, 0);
+  }
+  
+}
+

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java?rev=783981&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java Fri Jun 12 05:15:25 2009
@@ -0,0 +1,454 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+/**
+ * A InputFormat that reads input data from an SQL table.
+ * <p>
+ * DBInputFormat emits LongWritables containing the record number as 
+ * key and DBWritables as value. 
+ * 
+ * The SQL query, and input class can be using one of the two 
+ * setInput methods.
+ */
+public class DBInputFormat<T  extends DBWritable>
+    extends InputFormat<LongWritable, T> implements Configurable {
+  /**
+   * A RecordReader that reads records from a SQL table.
+   * Emits LongWritables containing the record number as 
+   * key and DBWritables as value.  
+   */
+  public class DBRecordReader extends
+      RecordReader<LongWritable, T> {
+    private ResultSet results;
+
+    private Statement statement;
+
+    private Class<T> inputClass;
+
+    private Configuration conf;
+
+    private DBInputSplit split;
+
+    private long pos = 0;
+    
+    private LongWritable key = null;
+    
+    private T value = null;
+
+    /**
+     * @param split The InputSplit to read data for
+     * @throws SQLException 
+     */
+    public DBRecordReader(DBInputSplit split, 
+        Class<T> inputClass, Configuration conf) throws SQLException {
+      this.inputClass = inputClass;
+      this.split = split;
+      this.conf = conf;
+      
+      statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY,
+                    ResultSet.CONCUR_READ_ONLY);
+
+      //statement.setFetchSize(Integer.MIN_VALUE);
+      results = statement.executeQuery(getSelectQuery());
+    }
+
+    /** Returns the query for selecting the records, 
+     * subclasses can override this for custom behaviour.*/
+    protected String getSelectQuery() {
+      StringBuilder query = new StringBuilder();
+      
+      if(dbConf.getInputQuery() == null) {
+        query.append("SELECT ");
+
+        for (int i = 0; i < fieldNames.length; i++) {
+          query.append(fieldNames[i]);
+          if(i != fieldNames.length -1) {
+            query.append(", ");
+          }
+        }
+
+        query.append(" FROM ").append(tableName);
+        query.append(" AS ").append(tableName); //in hsqldb this is necessary
+        if (conditions != null && conditions.length() > 0)
+          query.append(" WHERE (").append(conditions).append(")");
+        String orderBy = dbConf.getInputOrderBy();
+        if(orderBy != null && orderBy.length() > 0) {
+          query.append(" ORDER BY ").append(orderBy);
+        }
+      }
+      else {
+        query.append(dbConf.getInputQuery());
+      }
+
+      try {
+        query.append(" LIMIT ").append(split.getLength());
+        query.append(" OFFSET ").append(split.getStart());
+      }
+      catch (IOException ex) {
+        //ignore, will not throw
+      }
+      return query.toString();
+    }
+
+    /** {@inheritDoc} */
+    public void close() throws IOException {
+      try {
+        connection.commit();
+        results.close();
+        statement.close();
+      } catch (SQLException e) {
+        throw new IOException(e.getMessage());
+      }
+    }
+
+    public void initialize(InputSplit split, TaskAttemptContext context) 
+        throws IOException, InterruptedException {
+      //do nothing
+    }
+
+    /** {@inheritDoc} */
+    public LongWritable getCurrentKey() {
+      return key;  
+    }
+
+    /** {@inheritDoc} */
+    public T getCurrentValue() {
+      return value;
+    }
+
+    /**
+     * @deprecated 
+     */
+    @Deprecated
+    protected T createValue() {
+      return ReflectionUtils.newInstance(inputClass, conf);
+    }
+
+    /**
+     * @deprecated 
+     */
+    @Deprecated
+    protected long getPos() throws IOException {
+      return pos;
+    }
+
+    /**
+     * @deprecated Use {@link #nextKeyValue()}
+     */
+    @Deprecated
+    protected boolean next(LongWritable key, T value) throws IOException {
+      this.key = key;
+      this.value = value;
+      return nextKeyValue();
+    }
+
+    /** {@inheritDoc} */
+    public float getProgress() throws IOException {
+      return pos / (float)split.getLength();
+    }
+
+    /** {@inheritDoc} */
+    public boolean nextKeyValue() throws IOException {
+      try {
+        if (key == null) {
+          key = new LongWritable();
+        }
+        if (value == null) {
+          value = createValue();
+        }
+        if (!results.next())
+          return false;
+
+        // Set the key field value as the output key value
+        key.set(pos + split.getStart());
+
+        value.readFields(results);
+
+        pos ++;
+      } catch (SQLException e) {
+        throw new IOException(e.getMessage());
+      }
+      return true;
+    }
+  }
+
+  /**
+   * A Class that does nothing, implementing DBWritable
+   */
+  public static class NullDBWritable implements DBWritable, Writable {
+    @Override
+    public void readFields(DataInput in) throws IOException { }
+    @Override
+    public void readFields(ResultSet arg0) throws SQLException { }
+    @Override
+    public void write(DataOutput out) throws IOException { }
+    @Override
+    public void write(PreparedStatement arg0) throws SQLException { }
+  }
+  
+  /**
+   * A InputSplit that spans a set of rows
+   */
+  public static class DBInputSplit extends InputSplit implements Writable {
+
+    private long end = 0;
+    private long start = 0;
+
+    /**
+     * Default Constructor
+     */
+    public DBInputSplit() {
+    }
+
+    /**
+     * Convenience Constructor
+     * @param start the index of the first row to select
+     * @param end the index of the last row to select
+     */
+    public DBInputSplit(long start, long end) {
+      this.start = start;
+      this.end = end;
+    }
+
+    /** {@inheritDoc} */
+    public String[] getLocations() throws IOException {
+      // TODO Add a layer to enable SQL "sharding" and support locality
+      return new String[] {};
+    }
+
+    /**
+     * @return The index of the first row to select
+     */
+    public long getStart() {
+      return start;
+    }
+
+    /**
+     * @return The index of the last row to select
+     */
+    public long getEnd() {
+      return end;
+    }
+
+    /**
+     * @return The total row count in this split
+     */
+    public long getLength() throws IOException {
+      return end - start;
+    }
+
+    /** {@inheritDoc} */
+    public void readFields(DataInput input) throws IOException {
+      start = input.readLong();
+      end = input.readLong();
+    }
+
+    /** {@inheritDoc} */
+    public void write(DataOutput output) throws IOException {
+      output.writeLong(start);
+      output.writeLong(end);
+    }
+  }
+
+  private String conditions;
+
+  private Connection connection;
+
+  private String tableName;
+
+  private String[] fieldNames;
+
+  private DBConfiguration dbConf;
+
+  /** {@inheritDoc} */
+  public void setConf(Configuration conf) {
+
+    dbConf = new DBConfiguration(conf);
+
+    try {
+      this.connection = dbConf.getConnection();
+      this.connection.setAutoCommit(false);
+      connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+    }
+    catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+
+    tableName = dbConf.getInputTableName();
+    fieldNames = dbConf.getInputFieldNames();
+    conditions = dbConf.getInputConditions();
+  }
+
+  public Configuration getConf() {
+    return dbConf.getConf();
+  }
+  
+  public DBConfiguration getDBConf() {
+    return dbConf;
+  }
+  
+  /** {@inheritDoc} */
+  @SuppressWarnings("unchecked")
+  public RecordReader<LongWritable, T> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException, InterruptedException {  
+
+    Class inputClass = dbConf.getInputClass();
+    try {
+      return new DBRecordReader((DBInputSplit) split, inputClass,
+                                context.getConfiguration());
+    }
+    catch (SQLException ex) {
+      throw new IOException(ex.getMessage());
+    }
+  }
+
+  /** {@inheritDoc} */
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+
+	ResultSet results = null;  
+	Statement statement = null;
+    try {
+      statement = connection.createStatement();
+
+      results = statement.executeQuery(getCountQuery());
+      results.next();
+
+      long count = results.getLong(1);
+      int chunks = job.getConfiguration().getInt("mapred.map.tasks", 1);
+      long chunkSize = (count / chunks);
+
+      results.close();
+      statement.close();
+
+      List<InputSplit> splits = new ArrayList<InputSplit>();
+
+      // Split the rows into n-number of chunks and adjust the last chunk
+      // accordingly
+      for (int i = 0; i < chunks; i++) {
+        DBInputSplit split;
+
+        if ((i + 1) == chunks)
+          split = new DBInputSplit(i * chunkSize, count);
+        else
+          split = new DBInputSplit(i * chunkSize, (i * chunkSize)
+              + chunkSize);
+
+        splits.add(split);
+      }
+
+      return splits;
+    } catch (SQLException e) {
+      try {
+        if (results != null) { results.close(); }
+      } catch (SQLException e1) {}
+      try {
+        if (statement != null) { statement.close(); }
+      } catch (SQLException e1) {}
+      throw new IOException(e.getMessage());
+    }
+  }
+
+  /** Returns the query for getting the total number of rows, 
+   * subclasses can override this for custom behaviour.*/
+  protected String getCountQuery() {
+    
+    if(dbConf.getInputCountQuery() != null) {
+      return dbConf.getInputCountQuery();
+    }
+    
+    StringBuilder query = new StringBuilder();
+    query.append("SELECT COUNT(*) FROM " + tableName);
+
+    if (conditions != null && conditions.length() > 0)
+      query.append(" WHERE " + conditions);
+    return query.toString();
+  }
+
+  /**
+   * Initializes the map-part of the job with the appropriate input settings.
+   * 
+   * @param job The map-reduce job
+   * @param inputClass the class object implementing DBWritable, which is the 
+   * Java object holding tuple fields.
+   * @param tableName The table to read data from
+   * @param conditions The condition which to select data with, 
+   * eg. '(updated > 20070101 AND length > 0)'
+   * @param orderBy the fieldNames in the orderBy clause.
+   * @param fieldNames The field names in the table
+   * @see #setInput(Job, Class, String, String)
+   */
+  public static void setInput(Job job, 
+      Class<? extends DBWritable> inputClass,
+      String tableName,String conditions, 
+      String orderBy, String... fieldNames) {
+    job.setInputFormatClass(DBInputFormat.class);
+    DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
+    dbConf.setInputClass(inputClass);
+    dbConf.setInputTableName(tableName);
+    dbConf.setInputFieldNames(fieldNames);
+    dbConf.setInputConditions(conditions);
+    dbConf.setInputOrderBy(orderBy);
+  }
+  
+  /**
+   * Initializes the map-part of the job with the appropriate input settings.
+   * 
+   * @param job The map-reduce job
+   * @param inputClass the class object implementing DBWritable, which is the 
+   * Java object holding tuple fields.
+   * @param inputQuery the input query to select fields. Example : 
+   * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1"
+   * @param inputCountQuery the input query that returns 
+   * the number of records in the table. 
+   * Example : "SELECT COUNT(f1) FROM Mytable"
+   * @see #setInput(Job, Class, String, String, String, String...)
+   */
+  public static void setInput(Job job,
+      Class<? extends DBWritable> inputClass,
+      String inputQuery, String inputCountQuery) {
+    job.setInputFormatClass(DBInputFormat.class);
+    DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
+    dbConf.setInputClass(inputClass);
+    dbConf.setInputQuery(inputQuery);
+    dbConf.setInputCountQuery(inputCountQuery);
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBOutputFormat.java?rev=783981&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBOutputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBOutputFormat.java Fri Jun 12 05:15:25 2009
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A OutputFormat that sends the reduce output to a SQL table.
+ * <p> 
+ * {@link DBOutputFormat} accepts &lt;key,value&gt; pairs, where 
+ * key has a type extending DBWritable. Returned {@link RecordWriter} 
+ * writes <b>only the key</b> to the database with a batch SQL query.  
+ * 
+ */
+public class DBOutputFormat<K  extends DBWritable, V> 
+extends OutputFormat<K,V> {
+
+  private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
+  public void checkOutputSpecs(JobContext context) 
+      throws IOException, InterruptedException {}
+
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
+      throws IOException, InterruptedException {
+    return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
+                                   context);
+  }
+
+  /**
+   * A RecordWriter that writes the reduce output to a SQL table
+   */
+  public class DBRecordWriter 
+      extends RecordWriter<K, V> {
+
+    private Connection connection;
+    private PreparedStatement statement;
+
+    public DBRecordWriter() throws SQLException {
+    }
+
+    public DBRecordWriter(Connection connection
+        , PreparedStatement statement) throws SQLException {
+      this.connection = connection;
+      this.statement = statement;
+      this.connection.setAutoCommit(false);
+    }
+
+    public Connection getConnection() {
+      return connection;
+    }
+    
+    public PreparedStatement getStatement() {
+      return statement;
+    }
+    
+    /** {@inheritDoc} */
+    public void close(TaskAttemptContext context) throws IOException {
+      try {
+        statement.executeBatch();
+        connection.commit();
+      } catch (SQLException e) {
+        try {
+          connection.rollback();
+        }
+        catch (SQLException ex) {
+          LOG.warn(StringUtils.stringifyException(ex));
+        }
+        throw new IOException(e.getMessage());
+      } finally {
+        try {
+          statement.close();
+          connection.close();
+        }
+        catch (SQLException ex) {
+          throw new IOException(ex.getMessage());
+        }
+      }
+    }
+
+    /** {@inheritDoc} */
+    public void write(K key, V value) throws IOException {
+      try {
+        key.write(statement);
+        statement.addBatch();
+      } catch (SQLException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * Constructs the query used as the prepared statement to insert data.
+   * 
+   * @param table
+   *          the table to insert into
+   * @param fieldNames
+   *          the fields to insert into. If field names are unknown, supply an
+   *          array of nulls.
+   */
+  public String constructQuery(String table, String[] fieldNames) {
+    if(fieldNames == null) {
+      throw new IllegalArgumentException("Field names may not be null");
+    }
+
+    StringBuilder query = new StringBuilder();
+    query.append("INSERT INTO ").append(table);
+
+    if (fieldNames.length > 0 && fieldNames[0] != null) {
+      query.append(" (");
+      for (int i = 0; i < fieldNames.length; i++) {
+        query.append(fieldNames[i]);
+        if (i != fieldNames.length - 1) {
+          query.append(",");
+        }
+      }
+      query.append(")");
+    }
+    query.append(" VALUES (");
+
+    for (int i = 0; i < fieldNames.length; i++) {
+      query.append("?");
+      if(i != fieldNames.length - 1) {
+        query.append(",");
+      }
+    }
+    query.append(");");
+
+    return query.toString();
+  }
+
+  /** {@inheritDoc} */
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) 
+      throws IOException {
+    DBConfiguration dbConf = new DBConfiguration(context.getConfiguration());
+    String tableName = dbConf.getOutputTableName();
+    String[] fieldNames = dbConf.getOutputFieldNames();
+    
+    if(fieldNames == null) {
+      fieldNames = new String[dbConf.getOutputFieldCount()];
+    }
+    
+    try {
+      Connection connection = dbConf.getConnection();
+      PreparedStatement statement = null;
+  
+      statement = connection.prepareStatement(
+                    constructQuery(tableName, fieldNames));
+      return new DBRecordWriter(connection, statement);
+    } catch (Exception ex) {
+      throw new IOException(ex.getMessage());
+    }
+  }
+
+  /**
+   * Initializes the reduce-part of the job with 
+   * the appropriate output settings
+   * 
+   * @param job The job
+   * @param tableName The table to insert data into
+   * @param fieldNames The field names in the table.
+   */
+  public static void setOutput(Job job, String tableName, 
+      String... fieldNames) throws IOException {
+    if(fieldNames.length > 0 && fieldNames[0] != null) {
+      DBConfiguration dbConf = setOutput(job, tableName);
+      dbConf.setOutputFieldNames(fieldNames);
+    } else {
+      if (fieldNames.length > 0) {
+        setOutput(job, tableName, fieldNames.length);
+      }
+      else { 
+        throw new IllegalArgumentException(
+          "Field names must be greater than 0");
+      }
+    }
+  }
+  
+  /**
+   * Initializes the reduce-part of the job 
+   * with the appropriate output settings
+   * 
+   * @param job The job
+   * @param tableName The table to insert data into
+   * @param fieldCount the number of fields in the table.
+   */
+  public static void setOutput(Job job, String tableName, 
+      int fieldCount) throws IOException {
+    DBConfiguration dbConf = setOutput(job, tableName);
+    dbConf.setOutputFieldCount(fieldCount);
+  }
+  
+  private static DBConfiguration setOutput(Job job,
+      String tableName) throws IOException {
+    job.setOutputFormatClass(DBOutputFormat.class);
+    job.setReduceSpeculativeExecution(false);
+
+    DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
+    
+    dbConf.setOutputTableName(tableName);
+    return dbConf;
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBWritable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBWritable.java?rev=783981&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBWritable.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBWritable.java Fri Jun 12 05:15:25 2009
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Objects that are read from/written to a database should implement
+ * <code>DBWritable</code>. DBWritable, is similar to {@link Writable} 
+ * except that the {@link #write(PreparedStatement)} method takes a 
+ * {@link PreparedStatement}, and {@link #readFields(ResultSet)} 
+ * takes a {@link ResultSet}. 
+ * <p>
+ * Implementations are responsible for writing the fields of the object 
+ * to PreparedStatement, and reading the fields of the object from the 
+ * ResultSet. 
+ * 
+ * <p>Example:</p>
+ * If we have the following table in the database :
+ * <pre>
+ * CREATE TABLE MyTable (
+ *   counter        INTEGER NOT NULL,
+ *   timestamp      BIGINT  NOT NULL,
+ * );
+ * </pre>
+ * then we can read/write the tuples from/to the table with :
+ * <p><pre>
+ * public class MyWritable implements Writable, DBWritable {
+ *   // Some data     
+ *   private int counter;
+ *   private long timestamp;
+ *       
+ *   //Writable#write() implementation
+ *   public void write(DataOutput out) throws IOException {
+ *     out.writeInt(counter);
+ *     out.writeLong(timestamp);
+ *   }
+ *       
+ *   //Writable#readFields() implementation
+ *   public void readFields(DataInput in) throws IOException {
+ *     counter = in.readInt();
+ *     timestamp = in.readLong();
+ *   }
+ *       
+ *   public void write(PreparedStatement statement) throws SQLException {
+ *     statement.setInt(1, counter);
+ *     statement.setLong(2, timestamp);
+ *   }
+ *       
+ *   public void readFields(ResultSet resultSet) throws SQLException {
+ *     counter = resultSet.getInt(1);
+ *     timestamp = resultSet.getLong(2);
+ *   } 
+ * }
+ * </pre></p>
+ */
+public interface DBWritable {
+
+  /**
+   * Sets the fields of the object in the {@link PreparedStatement}.
+   * @param statement the statement that the fields are put into.
+   * @throws SQLException
+   */
+	public void write(PreparedStatement statement) throws SQLException;
+	
+	/**
+	 * Reads the fields of the object from the {@link ResultSet}. 
+	 * @param resultSet the {@link ResultSet} to get the fields from.
+	 * @throws SQLException
+	 */
+	public void readFields(ResultSet resultSet) throws SQLException ; 
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/package.html
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/package.html?rev=783981&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/package.html (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/package.html Fri Jun 12 05:15:25 2009
@@ -0,0 +1,44 @@
+<html>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+<h2>org.apache.hadoop.mapred.lib.db Package</h2>
+<p>
+This package contains a library to read records from a database as an 
+input to a mapreduce job, and write the output records to the database.   
+</p>
+<p>
+The Database to access can be configured using the static methods in the 
+DBConfiguration class. Jobs reading input from a database should use 
+DBInputFormat#setInput() to set the configuration. And jobs writing 
+its output to the database should use DBOutputFormat#setOutput().
+</p>
+<p> 
+Tuples from/to the database are converted to/from Java objects using 
+DBWritable methods. Typically, for each table in the db, a class extending
+DBWritable is defined, which holds the fields of the tuple. The fields 
+of a record are read from the database using DBWritable#readFields(ResultSet),
+and written to the database using DBWritable#write(PreparedStatament 
+statement). 
+</p>
+<p>
+An example program using both DBInputFormat and DBOutputFormat can be found 
+at src/examples/org/apache/hadoop/examples/DBCountPageview.java.
+</p>
+</body>
+</html>

Modified: hadoop/core/trunk/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/findbugsExcludeFile.xml?rev=783981&r1=783980&r2=783981&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/core/trunk/src/test/findbugsExcludeFile.xml Fri Jun 12 05:15:25 2009
@@ -141,6 +141,17 @@
        <Method name="getSplits" />
        <Bug pattern="DLS_DEAD_LOCAL_STORE" />
      </Match>
+
+     <Match>
+       <Class name="org.apache.hadoop.mapred.lib.db.DBWritable" />
+       <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
+     </Match>
+
+     <Match>
+       <Class name="org.apache.hadoop.mapred.lib.db.DBInputFormat$DBRecordReader" />
+       <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
+     </Match>
+     
     <!--
       This is a spurious warning. Just ignore
     -->

Added: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestDBJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestDBJob.java?rev=783981&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestDBJob.java (added)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestDBJob.java Fri Jun 12 05:15:25 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.IOException;
+
+import org.apache.hadoop.examples.DBCountPageView;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.util.ToolRunner;
+
+
+public class TestDBJob extends HadoopTestCase {
+
+  public TestDBJob() throws IOException {
+    super(LOCAL_MR, LOCAL_FS, 3, 1);
+  }
+  
+  public void testRun() throws Exception {
+    DBCountPageView testDriver = new DBCountPageView();
+    ToolRunner.run(createJobConf(), testDriver, new String[0]);
+  }
+  
+}

Added: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestDBOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestDBOutputFormat.java?rev=783981&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestDBOutputFormat.java (added)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestDBOutputFormat.java Fri Jun 12 05:15:25 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+
+public class TestDBOutputFormat extends TestCase {
+  
+  private String[] fieldNames = new String[] { "id", "name", "value" };
+  private String[] nullFieldNames = new String[] { null, null, null };
+  private String expected = "INSERT INTO hadoop_output " +
+                             "(id,name,value) VALUES (?,?,?);";
+  private String nullExpected = "INSERT INTO hadoop_output VALUES (?,?,?);"; 
+  
+  private DBOutputFormat<DBWritable, NullWritable> format 
+    = new DBOutputFormat<DBWritable, NullWritable>();
+  
+  public void testConstructQuery() {  
+    String actual = format.constructQuery("hadoop_output", fieldNames);
+    assertEquals(expected, actual);
+    
+    actual = format.constructQuery("hadoop_output", nullFieldNames);
+    assertEquals(nullExpected, actual);
+  }
+  
+  public void testSetOutput() throws IOException {
+    Job job = new Job(new Configuration());
+    DBOutputFormat.setOutput(job, "hadoop_output", fieldNames);
+    
+    DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
+    String actual = format.constructQuery(dbConf.getOutputTableName()
+        , dbConf.getOutputFieldNames());
+    
+    assertEquals(expected, actual);
+    
+    job = new Job(new Configuration());
+    dbConf = new DBConfiguration(job.getConfiguration());
+    DBOutputFormat.setOutput(job, "hadoop_output", nullFieldNames.length);
+    assertNull(dbConf.getOutputFieldNames());
+    assertEquals(nullFieldNames.length, dbConf.getOutputFieldCount());
+    
+    actual = format.constructQuery(dbConf.getOutputTableName()
+        , new String[dbConf.getOutputFieldCount()]);
+    
+    assertEquals(nullExpected, actual);
+  }
+  
+}



Mime
View raw message