hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject svn commit: r814656 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapreduce/lib/db/ src/test/mapred/org/apache/hadoop/mapreduce/lib/db/
Date Mon, 14 Sep 2009 14:21:52 GMT
Author: enis
Date: Mon Sep 14 14:21:51 2009
New Revision: 814656

URL: http://svn.apache.org/viewvc?rev=814656&view=rev
Log:
MAPREDUCE-885. More efficient SQL queries for DBInputFormat. Contributed by Aaron Kimball. 

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BigDecimalSplitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BooleanSplitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBSplitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=814656&r1=814655&r2=814656&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Sep 14 14:21:51 2009
@@ -337,6 +337,9 @@
     MAPREDUCE-856. Setup secure permissions for distributed cache files.
     (Vinod Kumar Vavilapalli via yhemanth)
 
+    MAPREDUCE-885. More efficient SQL queries for DBInputFormat. (Aaron Kimball 
+    via enis)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BigDecimalSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BigDecimalSplitter.java?rev=814656&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BigDecimalSplitter.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BigDecimalSplitter.java Mon Sep 14 14:21:51 2009
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.math.BigDecimal;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Implement DBSplitter over BigDecimal values.
+ */
+public class BigDecimalSplitter implements DBSplitter {
+  private static final Log LOG = LogFactory.getLog(BigDecimalSplitter.class);
+
+  public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
+      throws SQLException {
+
+    BigDecimal minVal = results.getBigDecimal(1);
+    BigDecimal maxVal = results.getBigDecimal(2);
+
+    String lowClausePrefix = colName + " >= ";
+    String highClausePrefix = colName + " < ";
+
+    BigDecimal numSplits = new BigDecimal(conf.getInt("mapred.map.tasks", 1));
+
+    if (minVal == null && maxVal == null) {
+      // Range is null to null. Return a null split accordingly.
+      List<InputSplit> splits = new ArrayList<InputSplit>();
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+      return splits;
+    }
+
+    if (minVal == null || maxVal == null) {
+      // Don't know what is a reasonable min/max value for interpolation. Fail.
+      LOG.error("Cannot find a range for NUMERIC or DECIMAL fields with one end NULL.");
+      return null;
+    }
+
+    // Get all the split points together.
+    List<BigDecimal> splitPoints = split(numSplits, minVal, maxVal);
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    // Turn the split points into a set of intervals.
+    BigDecimal start = splitPoints.get(0);
+    for (int i = 1; i < splitPoints.size(); i++) {
+      BigDecimal end = splitPoints.get(i);
+
+      if (i == splitPoints.size() - 1) {
+        // This is the last one; use a closed interval.
+        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+            lowClausePrefix + start.toString(),
+            colName + " <= " + end.toString()));
+      } else {
+        // Normal open-interval case.
+        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+            lowClausePrefix + start.toString(),
+            highClausePrefix + end.toString()));
+      }
+
+      start = end;
+    }
+
+    return splits;
+  }
+
+  private static final BigDecimal MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE);
+
+  /**
+   * Divide numerator by denominator. If impossible in exact mode, use rounding.
+   */
+  protected BigDecimal tryDivide(BigDecimal numerator, BigDecimal denominator) {
+    try {
+      return numerator.divide(denominator);
+    } catch (ArithmeticException ae) {
+      return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
+    }
+  }
+
+  /**
+   * Returns a list of BigDecimals one element longer than the list of input splits.
+   * This represents the boundaries between input splits.
+   * All splits are open on the top end, except the last one.
+   *
+   * So the list [0, 5, 8, 12, 18] would represent splits capturing the intervals:
+   *
+   * [0, 5)
+   * [5, 8)
+   * [8, 12)
+   * [12, 18] note the closed interval for the last split.
+   */
+  List<BigDecimal> split(BigDecimal numSplits, BigDecimal minVal, BigDecimal maxVal)
+      throws SQLException {
+
+    List<BigDecimal> splits = new ArrayList<BigDecimal>();
+
+    // Use numSplits as a hint. May need an extra task if the size doesn't
+    // divide cleanly.
+
+    BigDecimal splitSize = tryDivide(maxVal.subtract(minVal), (numSplits));
+    if (splitSize.compareTo(MIN_INCREMENT) < 0) {
+      splitSize = MIN_INCREMENT;
+      LOG.warn("Set BigDecimal splitSize to MIN_INCREMENT");
+    }
+
+    BigDecimal curVal = minVal;
+
+    while (curVal.compareTo(maxVal) <= 0) {
+      splits.add(curVal);
+      curVal = curVal.add(splitSize);
+    }
+
+    if (splits.get(splits.size() - 1).compareTo(maxVal) != 0 || splits.size() == 1) {
+      // We didn't end on the maxVal. Add that to the end of the list.
+      splits.add(maxVal);
+    }
+
+    return splits;
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BooleanSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BooleanSplitter.java?rev=814656&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BooleanSplitter.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BooleanSplitter.java Mon Sep 14 14:21:51 2009
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Implement DBSplitter over boolean values.
+ */
+public class BooleanSplitter implements DBSplitter {
+  public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
+      throws SQLException {
+
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    if (results.getString(1) == null && results.getString(2) == null) {
+      // Range is null to null. Return a null split accordingly.
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+      return splits;
+    }
+
+    boolean minVal = results.getBoolean(1);
+    boolean maxVal = results.getBoolean(2);
+
+    // Use one or two splits.
+    if (!minVal) {
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " = FALSE", colName + " = FALSE"));
+    }
+
+    if (maxVal) {
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " = TRUE", colName + " = TRUE"));
+    }
+
+    if (results.getString(1) == null || results.getString(2) == null) {
+      // Include a null value.
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+    }
+
+    return splits;
+  }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java?rev=814656&r1=814655&r2=814656&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java Mon Sep 14 14:21:51 2009
@@ -77,6 +77,10 @@
   public static final String INPUT_COUNT_QUERY = 
       "mapred.jdbc.input.count.query";
   
+  /** Input query to get the max and min values of the jdbc.input.query */
+  public static final String INPUT_BOUNDING_QUERY =
+      "mapred.jdbc.input.bounding.query";
+  
   /** Class name implementing DBWritable which will hold input tuples */
   public static final String INPUT_CLASS_PROPERTY = "mapred.jdbc.input.class";
 
@@ -207,7 +211,17 @@
       conf.set(DBConfiguration.INPUT_COUNT_QUERY, query);
     }
   }
-  
+
+  public void setInputBoundingQuery(String query) {
+    if (query != null && query.length() > 0) {
+      conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query);
+    }
+  }
+
+  public String getInputBoundingQuery() {
+    return conf.get(DBConfiguration.INPUT_BOUNDING_QUERY);
+  }
+
   public Class<?> getInputClass() {
     return conf.getClass(DBConfiguration.INPUT_CLASS_PROPERTY,
                          NullDBWritable.class);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java?rev=814656&r1=814655&r2=814656&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java Mon Sep 14 14:21:51 2009
@@ -172,7 +172,15 @@
   public DBConfiguration getDBConf() {
     return dbConf;
   }
-  
+
+  public Connection getConnection() {
+    return connection;
+  }
+
+  public String getDBProductName() {
+    return dbProductName;
+  }
+
   protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
       Configuration conf) throws IOException {
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java?rev=814656&r1=814655&r2=814656&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java Mon Sep 14 14:21:51 2009
@@ -30,6 +30,8 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -49,6 +51,9 @@
  */
 public class DBRecordReader<T extends DBWritable> extends
     RecordReader<LongWritable, T> {
+
+  private static final Log LOG = LogFactory.getLog(DBRecordReader.class);
+
   private ResultSet results = null;
 
   private Class<T> inputClass;
@@ -102,7 +107,7 @@
   /** Returns the query for selecting the records, 
    * subclasses can override this for custom behaviour.*/
   protected String getSelectQuery() {
-      StringBuilder query = new StringBuilder();
+    StringBuilder query = new StringBuilder();
 
     // Default codepath for MySQL, HSQLDB, etc. Relies on LIMIT/OFFSET for splits.
     if(dbConf.getInputQuery() == null) {
@@ -254,4 +259,12 @@
   protected Connection getConnection() {
     return connection;
   }
+
+  protected PreparedStatement getStatement() {
+    return statement;
+  }
+
+  protected void setStatement(PreparedStatement stmt) {
+    this.statement = stmt;
+  }
 }

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBSplitter.java?rev=814656&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBSplitter.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBSplitter.java Mon Sep 14 14:21:51 2009
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * DBSplitter will generate DBInputSplits to use with DataDrivenDBInputFormat.
+ * DataDrivenDBInputFormat needs to interpolate between two values that
+ * represent the lowest and highest valued records to import. Depending
+ * on the data-type of the column, this requires different behavior.
+ * DBSplitter implementations should perform this for a data type or family
+ * of data types.
+ */
+public interface DBSplitter {
+  /**
+   * Given a ResultSet containing one record (and already advanced to that record)
+   * with two columns (a low value, and a high value, both of the same type), determine
+   * a set of splits that span the given values.
+   */
+  List<InputSplit> split(Configuration conf, ResultSet results, String colName) throws SQLException;
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java?rev=814656&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java Mon Sep 14 14:21:51 2009
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A InputFormat that reads input data from an SQL table.
+ * Operates like DBInputFormat, but instead of using LIMIT and OFFSET to demarcate
+ * splits, it tries to generate WHERE clauses which separate the data into roughly
+ * equivalent shards.
+ */
+public class DataDrivenDBInputFormat<T extends DBWritable>
+    extends DBInputFormat<T> implements Configurable {
+
+  private static final Log LOG = LogFactory.getLog(DataDrivenDBInputFormat.class);
+
+  /** If users are providing their own query, the following string is expected to
+      appear in the WHERE clause, which will be substituted with a pair of conditions
+      on the input to allow input splits to parallelise the import. */
+  public static final String SUBSTITUTE_TOKEN = "$CONDITIONS";
+
+  /**
+   * A InputSplit that spans a set of rows
+   */
+  public static class DataDrivenDBInputSplit extends DBInputFormat.DBInputSplit {
+
+    private String lowerBoundClause;
+    private String upperBoundClause;
+
+    /**
+     * Default Constructor
+     */
+    public DataDrivenDBInputSplit() {
+    }
+
+    /**
+     * Convenience Constructor
+     * @param lower the string to be put in the WHERE clause to guard on the 'lower' end
+     * @param upper the string to be put in the WHERE clause to guard on the 'upper' end
+     */
+    public DataDrivenDBInputSplit(final String lower, final String upper) {
+      this.lowerBoundClause = lower;
+      this.upperBoundClause = upper;
+    }
+
+
+    /**
+     * @return The total row count in this split
+     */
+    public long getLength() throws IOException {
+      return 0; // unfortunately, we don't know this.
+    }
+
+    /** {@inheritDoc} */
+    public void readFields(DataInput input) throws IOException {
+      this.lowerBoundClause = Text.readString(input);
+      this.upperBoundClause = Text.readString(input);
+    }
+
+    /** {@inheritDoc} */
+    public void write(DataOutput output) throws IOException {
+      Text.writeString(output, this.lowerBoundClause);
+      Text.writeString(output, this.upperBoundClause);
+    }
+
+    public String getLowerClause() {
+      return lowerBoundClause;
+    }
+
+    public String getUpperClause() {
+      return upperBoundClause;
+    }
+  }
+
+  /**
+   * @return the DBSplitter implementation to use to divide the table/query into InputSplits.
+   */
+  protected DBSplitter getSplitter(int sqlDataType) {
+    switch (sqlDataType) {
+    case Types.NUMERIC:
+    case Types.DECIMAL:
+      return new BigDecimalSplitter();
+
+    case Types.BIT:
+    case Types.BOOLEAN:
+      return new BooleanSplitter();
+
+    case Types.INTEGER:
+    case Types.TINYINT:
+    case Types.SMALLINT:
+    case Types.BIGINT:
+      return new IntegerSplitter();
+
+    case Types.REAL:
+    case Types.FLOAT:
+    case Types.DOUBLE:
+      return new FloatSplitter();
+
+    case Types.CHAR:
+    case Types.VARCHAR:
+    case Types.LONGVARCHAR:
+      return new TextSplitter();
+
+    case Types.DATE:
+    case Types.TIME:
+    case Types.TIMESTAMP:
+      return new DateSplitter();
+
+    default:
+      // TODO: Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB, BLOB, ARRAY
+      // STRUCT, REF, DATALINK, and JAVA_OBJECT.
+      return null;
+    }
+  }
+
+  /** {@inheritDoc} */
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+
+    ResultSet results = null;
+    Statement statement = null;
+    try {
+      statement = getConnection().createStatement();
+
+      results = statement.executeQuery(getBoundingValsQuery());
+      results.next();
+
+      // Based on the type of the results, use a different mechanism
+      // for interpolating split points (i.e., numeric splits, text splits,
+      // dates, etc.)
+      int sqlDataType = results.getMetaData().getColumnType(1);
+      DBSplitter splitter = getSplitter(sqlDataType);
+      if (null == splitter) {
+        throw new IOException("Unknown SQL data type: " + sqlDataType);
+      }
+
+      return splitter.split(job.getConfiguration(), results, getDBConf().getInputOrderBy());
+    } catch (SQLException e) {
+      throw new IOException(e.getMessage());
+    } finally {
+      // More-or-less ignore SQL exceptions here, but log in case we need it.
+      try {
+        if (null != results) {
+          results.close();
+        }
+      } catch (SQLException se) {
+        LOG.debug("SQLException closing resultset: " + se.toString());
+      }
+
+      try {
+        if (null != statement) {
+          statement.close();
+        }
+      } catch (SQLException se) {
+        LOG.debug("SQLException closing statement: " + se.toString());
+      }
+
+      try {
+        getConnection().commit();
+      } catch (SQLException se) {
+        LOG.debug("SQLException committing split transaction: " + se.toString());
+      }
+    }
+  }
+
+  /**
+   * @return a query which returns the minimum and maximum values for
+   * the order-by column.
+   *
+   * The min value should be in the first column, and the
+   * max value should be in the second column of the results.
+   */
+  protected String getBoundingValsQuery() {
+    // If the user has provided a query, use that instead.
+    String userQuery = getDBConf().getInputBoundingQuery();
+    if (null != userQuery) {
+      return userQuery;
+    }
+
+    // Auto-generate one based on the table name we've been provided with.
+    StringBuilder query = new StringBuilder();
+
+    String splitCol = getDBConf().getInputOrderBy();
+    query.append("SELECT MIN(").append(splitCol).append("), ");
+    query.append("MAX(").append(splitCol).append(") FROM ");
+    query.append(getDBConf().getInputTableName());
+    String conditions = getDBConf().getInputConditions();
+    if (null != conditions) {
+      query.append(" WHERE ( " + conditions + " )");
+    }
+
+    return query.toString();
+  }
+
+  /** Set the user-defined bounding query to use with a user-defined query.
+      This *must* include the substring "$CONDITIONS"
+      (DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) inside the WHERE clause,
+      so that DataDrivenDBInputFormat knows where to insert split clauses.
+      e.g., "SELECT foo FROM mytable WHERE $CONDITIONS"
+      This will be expanded to something like:
+        SELECT foo FROM mytable WHERE (id &gt; 100) AND (id &lt; 250)
+      inside each split.
+    */
+  public static void setBoundingQuery(Configuration conf, String query) {
+    if (null != query) {
+      // If the user's settng a query, warn if they don't allow conditions.
+      if (query.indexOf(SUBSTITUTE_TOKEN) == -1) {
+        LOG.warn("Could not find " + SUBSTITUTE_TOKEN + " token in query: " + query
+            + "; splits may not partition data.");
+      }
+    }
+
+    conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query);
+  }
+
+  protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
+      Configuration conf) throws IOException {
+
+    DBConfiguration dbConf = getDBConf();
+    @SuppressWarnings("unchecked")
+    Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
+    String dbProductName = getDBProductName();
+
+    try {
+      // use database product name to determine appropriate record reader.
+      if (dbProductName.startsWith("MYSQL")) {
+        // use MySQL-specific db reader.
+        return new MySQLDataDrivenDBRecordReader<T>(split, inputClass,
+            conf, getConnection(), dbConf, dbConf.getInputConditions(), dbConf.getInputFieldNames(),
+            dbConf.getInputTableName());
+      } else {
+        // Generic reader.
+        return new DataDrivenDBRecordReader<T>(split, inputClass,
+            conf, getConnection(), dbConf, dbConf.getInputConditions(), dbConf.getInputFieldNames(),
+            dbConf.getInputTableName());
+      }
+    } catch (SQLException ex) {
+      throw new IOException(ex.getMessage());
+    }
+  }
+
+  // Configuration methods override superclass to ensure that the proper
+  // DataDrivenDBInputFormat gets used.
+
+  /** Note that the "orderBy" column is called the "splitBy" in this version.
+    * We reuse the same field, but it's not strictly ordering it -- just partitioning
+    * the results.
+    */
+  public static void setInput(Job job, 
+      Class<? extends DBWritable> inputClass,
+      String tableName,String conditions, 
+      String splitBy, String... fieldNames) {
+    DBInputFormat.setInput(job, inputClass, tableName, conditions, splitBy, fieldNames);
+    job.setInputFormatClass(DataDrivenDBInputFormat.class);
+  }
+
+  /** setInput() takes a custom query and a separate "bounding query" to use
+      instead of the custom "count query" used by DBInputFormat.
+    */
+  public static void setInput(Job job,
+      Class<? extends DBWritable> inputClass,
+      String inputQuery, String inputBoundingQuery) {
+    DBInputFormat.setInput(job, inputClass, inputQuery, "");
+    job.getConfiguration().set(DBConfiguration.INPUT_BOUNDING_QUERY, inputBoundingQuery);
+    job.setInputFormatClass(DataDrivenDBInputFormat.class);
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java?rev=814656&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java Mon Sep 14 14:21:51 2009
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A RecordReader that reads records from a SQL table,
+ * using data-driven WHERE clause splits.
+ * Emits LongWritables containing the record number as
+ * key and DBWritables as value.
+ */
+public class DataDrivenDBRecordReader<T extends DBWritable> extends DBRecordReader<T> {
+
+  private static final Log LOG = LogFactory.getLog(DataDrivenDBRecordReader.class);
+
+  /**
+   * @param split The InputSplit to read data for
+   * @throws SQLException 
+   */
+  public DataDrivenDBRecordReader(DBInputFormat.DBInputSplit split,
+      Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
+      String cond, String [] fields, String table)
+      throws SQLException {
+    super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
+  }
+
+  /** Returns the query for selecting the records,
+   * subclasses can override this for custom behaviour.*/
+  @SuppressWarnings("unchecked")
+  protected String getSelectQuery() {
+    StringBuilder query = new StringBuilder();
+    DataDrivenDBInputFormat.DataDrivenDBInputSplit dataSplit =
+        (DataDrivenDBInputFormat.DataDrivenDBInputSplit) getSplit();
+    DBConfiguration dbConf = getDBConf();
+    String [] fieldNames = getFieldNames();
+    String tableName = getTableName();
+    String conditions = getConditions();
+
+    // Build the WHERE clauses associated with the data split first.
+    // We need them in both branches of this function.
+    StringBuilder conditionClauses = new StringBuilder();
+    conditionClauses.append("( ").append(dataSplit.getLowerClause());
+    conditionClauses.append(" ) AND ( ").append(dataSplit.getUpperClause());
+    conditionClauses.append(" )");
+
+    if(dbConf.getInputQuery() == null) {
+      // We need to generate the entire query.
+      query.append("SELECT ");
+
+      for (int i = 0; i < fieldNames.length; i++) {
+        query.append(fieldNames[i]);
+        if (i != fieldNames.length -1) {
+          query.append(", ");
+        }
+      }
+
+      query.append(" FROM ").append(tableName);
+      query.append(" AS ").append(tableName); //in hsqldb this is necessary
+      query.append(" WHERE ");
+      if (conditions != null && conditions.length() > 0) {
+        // Put the user's conditions first.
+        query.append("( ").append(conditions).append(" ) AND ");
+      }
+
+      // Now append the conditions associated with our split.
+      query.append(conditionClauses.toString());
+
+    } else {
+      // User provided the query. We replace the special token with our WHERE clause.
+      String inputQuery = dbConf.getInputQuery();
+      if (inputQuery.indexOf(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) == -1) {
+        LOG.error("Could not find the clause substitution token "
+            + DataDrivenDBInputFormat.SUBSTITUTE_TOKEN + " in the query: ["
+            + inputQuery + "]. Parallel splits may not work correctly.");
+      }
+
+      query.append(inputQuery.replace(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN,
+          conditionClauses.toString()));
+    }
+
+    return query.toString();
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java?rev=814656&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java Mon Sep 14 14:21:51 2009
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Implement DBSplitter over date/time values.
+ * Make use of logic from IntegerSplitter, since date/time are just longs
+ * in Java.
+ */
+public class DateSplitter extends IntegerSplitter {
+
+  private static final Log LOG = LogFactory.getLog(DateSplitter.class);
+
+  public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
+      throws SQLException {
+
+    long minVal;
+    long maxVal;
+
+    int sqlDataType = results.getMetaData().getColumnType(1);
+    minVal = resultSetColToLong(results, 1, sqlDataType);
+    maxVal = resultSetColToLong(results, 2, sqlDataType);
+
+    String lowClausePrefix = colName + " >= '";
+    String highClausePrefix = colName + " < '";
+
+    int numSplits = conf.getInt("mapred.map.tasks", 1);
+    if (numSplits < 1) {
+      numSplits = 1;
+    }
+
+    if (minVal == Long.MIN_VALUE && maxVal == Long.MIN_VALUE) {
+      // The range of acceptable dates is NULL to NULL. Just create a single split.
+      List<InputSplit> splits = new ArrayList<InputSplit>();
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+      return splits;
+    }
+
+    // Gather the split point integers
+    List<Long> splitPoints = split(numSplits, minVal, maxVal);
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    // Turn the split points into a set of intervals.
+    long start = splitPoints.get(0);
+    Date startDate = longToDate(start, sqlDataType);
+    if (sqlDataType == Types.TIMESTAMP) {
+      // The lower bound's nanos value needs to match the actual lower-bound nanos.
+      try {
+        ((java.sql.Timestamp) startDate).setNanos(results.getTimestamp(1).getNanos());
+      } catch (NullPointerException npe) {
+        // If the lower bound was NULL, we'll get an NPE; just ignore it and don't set nanos.
+      }
+    }
+
+    for (int i = 1; i < splitPoints.size(); i++) {
+      long end = splitPoints.get(i);
+      Date endDate = longToDate(end, sqlDataType);
+
+      if (i == splitPoints.size() - 1) {
+        if (sqlDataType == Types.TIMESTAMP) {
+          // The upper bound's nanos value needs to match the actual upper-bound nanos.
+          try {
+            ((java.sql.Timestamp) endDate).setNanos(results.getTimestamp(2).getNanos());
+          } catch (NullPointerException npe) {
+            // If the upper bound was NULL, we'll get an NPE; just ignore it and don't set nanos.
+          }
+        }
+        // This is the last one; use a closed interval.
+        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+            lowClausePrefix + startDate.toString() + "'",
+            colName + " <= '" + endDate.toString() + "'"));
+      } else {
+        // Normal open-interval case.
+        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+            lowClausePrefix + startDate.toString() + "'",
+            highClausePrefix + endDate.toString() + "'"));
+      }
+
+      start = end;
+      startDate = endDate;
+    }
+
+    if (minVal == Long.MIN_VALUE || maxVal == Long.MIN_VALUE) {
+      // Add an extra split to handle the null case that we saw.
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+    }
+
+    return splits;
+  }
+
+  /** Retrieve the value from the column in a type-appropriate manner and return
+      its timestamp since the epoch. If the column is null, then return Long.MIN_VALUE.
+      This will cause a special split to be generated for the NULL case, but may also
+      cause poorly-balanced splits if most of the actual dates are positive time
+      since the epoch, etc.
+    */
+  private long resultSetColToLong(ResultSet rs, int colNum, int sqlDataType) throws SQLException {
+    try {
+      switch (sqlDataType) {
+      case Types.DATE:
+        return rs.getDate(1).getTime();
+      case Types.TIME:
+        return rs.getTime(1).getTime();
+      case Types.TIMESTAMP:
+        return rs.getTimestamp(1).getTime();
+      default:
+        throw new SQLException("Not a date-type field");
+      }
+    } catch (NullPointerException npe) {
+      // null column. return minimum long value.
+      LOG.warn("Encountered a NULL date in the split column. Splits may be poorly balanced.");
+      return Long.MIN_VALUE;
+    }
+  }
+
+  /**  Parse the long-valued timestamp into the appropriate SQL date type. */
+  private Date longToDate(long val, int sqlDataType) {
+    switch (sqlDataType) {
+    case Types.DATE:
+      return new java.sql.Date(val);
+    case Types.TIME:
+      return new java.sql.Time(val);
+    case Types.TIMESTAMP:
+      return new java.sql.Timestamp(val);
+    default: // Shouldn't ever hit this case.
+      return null;
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java?rev=814656&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java Mon Sep 14 14:21:51 2009
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Implement DBSplitter over floating-point values.
+ */
+public class FloatSplitter implements DBSplitter {
+
+  private static final double MIN_INCREMENT = 10000 * Double.MIN_VALUE;
+
+  public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
+      throws SQLException {
+
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    if (results.getString(1) == null && results.getString(2) == null) {
+      // Range is null to null. Return a null split accordingly.
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+      return splits;
+    }
+
+    double minVal = results.getDouble(1);
+    double maxVal = results.getDouble(2);
+
+    // Use this as a hint. May need an extra task if the size doesn't
+    // divide cleanly.
+    int numSplits = conf.getInt("mapred.map.tasks", 1);
+    double splitSize = (maxVal - minVal) / (double) numSplits;
+
+    if (splitSize < MIN_INCREMENT) {
+      splitSize = MIN_INCREMENT;
+    }
+
+    String lowClausePrefix = colName + " >= ";
+    String highClausePrefix = colName + " < ";
+
+    double curLower = minVal;
+    double curUpper = curLower + splitSize;
+
+    while (curUpper < maxVal) {
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          lowClausePrefix + Double.toString(curLower),
+          highClausePrefix + Double.toString(curUpper)));
+
+      curLower = curUpper;
+      curUpper += splitSize;
+    }
+
+    // Catch any overage and create the closed interval for the last split.
+    if (curLower <= maxVal || splits.size() == 1) {
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          lowClausePrefix + Double.toString(curUpper),
+          colName + " <= " + Double.toString(maxVal)));
+    }
+
+    if (results.getString(1) == null || results.getString(2) == null) {
+      // At least one extrema is null; add a null split.
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+    }
+
+    return splits;
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java?rev=814656&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java Mon Sep 14 14:21:51 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Implement DBSplitter over integer values.
+ */
+public class IntegerSplitter implements DBSplitter {
+  public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
+      throws SQLException {
+
+    long minVal = results.getLong(1);
+    long maxVal = results.getLong(2);
+
+    String lowClausePrefix = colName + " >= ";
+    String highClausePrefix = colName + " < ";
+
+    int numSplits = conf.getInt("mapred.map.tasks", 1);
+    if (numSplits < 1) {
+      numSplits = 1;
+    }
+
+    if (results.getString(1) == null && results.getString(2) == null) {
+      // Range is null to null. Return a null split accordingly.
+      List<InputSplit> splits = new ArrayList<InputSplit>();
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+      return splits;
+    }
+
+    // Get all the split points together.
+    List<Long> splitPoints = split(numSplits, minVal, maxVal);
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    // Turn the split points into a set of intervals.
+    long start = splitPoints.get(0);
+    for (int i = 1; i < splitPoints.size(); i++) {
+      long end = splitPoints.get(i);
+
+      if (i == splitPoints.size() - 1) {
+        // This is the last one; use a closed interval.
+        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+            lowClausePrefix + Long.toString(start),
+            colName + " <= " + Long.toString(end)));
+      } else {
+        // Normal open-interval case.
+        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+            lowClausePrefix + Long.toString(start),
+            highClausePrefix + Long.toString(end)));
+      }
+
+      start = end;
+    }
+
+    if (results.getString(1) == null || results.getString(2) == null) {
+      // At least one extrema is null; add a null split.
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+    }
+
+    return splits;
+  }
+
+  /**
+   * Returns a list of longs one element longer than the list of input splits.
+   * This represents the boundaries between input splits.
+   * All splits are open on the top end, except the last one.
+   *
+   * So the list [0, 5, 8, 12, 18] would represent splits capturing the intervals:
+   *
+   * [0, 5)
+   * [5, 8)
+   * [8, 12)
+   * [12, 18] note the closed interval for the last split.
+   */
+  List<Long> split(long numSplits, long minVal, long maxVal)
+      throws SQLException {
+
+    List<Long> splits = new ArrayList<Long>();
+
+    // Use numSplits as a hint. May need an extra task if the size doesn't
+    // divide cleanly.
+
+    long splitSize = (maxVal - minVal) / numSplits;
+    if (splitSize < 1) {
+      splitSize = 1;
+    }
+
+    long curVal = minVal;
+
+    while (curVal <= maxVal) {
+      splits.add(curVal);
+      curVal += splitSize;
+    }
+
+    if (splits.get(splits.size() - 1) != maxVal || splits.size() == 1) {
+      // We didn't end on the maxVal. Add that to the end of the list.
+      splits.add(maxVal);
+    }
+
+    return splits;
+  }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java?rev=814656&r1=814655&r2=814656&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java Mon Sep 14 14:21:51 2009
@@ -41,6 +41,7 @@
     PreparedStatement statement = getConnection().prepareStatement(query,
       ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
     statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time.
+    setStatement(statement); // save a ref for cleanup in close()
     return statement.executeQuery();
   }
 }

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java?rev=814656&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java Mon Sep 14 14:21:51 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A RecordReader that reads records from a MySQL table via DataDrivenDBRecordReader
+ */
+public class MySQLDataDrivenDBRecordReader<T extends DBWritable>
+    extends DataDrivenDBRecordReader<T> {
+
+  public MySQLDataDrivenDBRecordReader(DBInputFormat.DBInputSplit split,
+      Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
+      String cond, String [] fields, String table) throws SQLException {
+    super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
+  }
+
+  // Execute statements for mysql in unbuffered mode.
+  protected ResultSet executeQuery(String query) throws SQLException {
+    PreparedStatement statement = getConnection().prepareStatement(query,
+      ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time.
+    setStatement(statement); // save a ref so the close() method cleans this up.
+    return statement.executeQuery();
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java?rev=814656&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java Mon Sep 14 14:21:51 2009
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Implement DBSplitter over text strings.
+ */
+public class TextSplitter extends BigDecimalSplitter {
+
+  private static final Log LOG = LogFactory.getLog(TextSplitter.class);
+
+  /**
+   * This method needs to determine the splits between two user-provided strings.
+   * In the case where the user's strings are 'A' and 'Z', this is not hard; we 
+   * could create two splits from ['A', 'M') and ['M', 'Z'], 26 splits for strings
+   * beginning with each letter, etc.
+   *
+   * If a user has provided us with the strings "Ham" and "Haze", however, we need
+   * to create splits that differ in the third letter.
+   *
+   * The algorithm used is as follows:
+   * Since there are 2**16 unicode characters, we interpret characters as digits in
+   * base 65536. Given a string 's' containing characters s_0, s_1 .. s_n, we interpret
+   * the string as the number: 0.s_0 s_1 s_2.. s_n in base 65536. Having mapped the
+   * low and high strings into floating-point values, we then use the BigDecimalSplitter
+   * to establish the even split points, then map the resulting floating point values
+   * back into strings.
+   */
+  public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
+      throws SQLException {
+
+    LOG.warn("Generating splits for a textual index column.");
+    LOG.warn("If your database sorts in a case-insensitive order, "
+        + "this may result in a partial import or duplicate records.");
+    LOG.warn("You are strongly encouraged to choose a numeric split column.");
+
+    String minString = results.getString(1);
+    String maxString = results.getString(2);
+
+    boolean minIsNull = false;
+
+    // If the min value is null, switch it to an empty string instead for purposes
+    // of interpolation. Then add [null, null] as a special case split.
+    if (null == minString) {
+      minString = "";
+      minIsNull = true;
+    }
+
+    if (null == maxString) {
+      // If the max string is null, then the min string has to be null too.
+      // Just return a special split for this case.
+      List<InputSplit> splits = new ArrayList<InputSplit>();
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+      return splits;
+    }
+
+    // Use this as a hint. May need an extra task if the size doesn't
+    // divide cleanly.
+    int numSplits = conf.getInt("mapred.map.tasks", 1);
+
+    String lowClausePrefix = colName + " >= '";
+    String highClausePrefix = colName + " < '";
+
+    // If there is a common prefix between minString and maxString, establish it
+    // and pull it out of minString and maxString.
+    int maxPrefixLen = Math.min(minString.length(), maxString.length());
+    int sharedLen;
+    for (sharedLen = 0; sharedLen < maxPrefixLen; sharedLen++) {
+      char c1 = minString.charAt(sharedLen);
+      char c2 = maxString.charAt(sharedLen);
+      if (c1 != c2) {
+        break;
+      }
+    }
+
+    // The common prefix has length 'sharedLen'. Extract it from both.
+    String commonPrefix = minString.substring(0, sharedLen);
+    minString = minString.substring(sharedLen);
+    maxString = maxString.substring(sharedLen);
+
+    List<String> splitStrings = split(numSplits, minString, maxString, commonPrefix);
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    // Convert the list of split point strings into an actual set of InputSplits.
+    String start = splitStrings.get(0);
+    for (int i = 1; i < splitStrings.size(); i++) {
+      String end = splitStrings.get(i);
+
+      if (i == splitStrings.size() - 1) {
+        // This is the last one; use a closed interval.
+        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+            lowClausePrefix + start + "'", colName + " <= '" + end + "'"));
+      } else {
+        // Normal open-interval case.
+        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+            lowClausePrefix + start + "'", highClausePrefix + end + "'"));
+      }
+    }
+
+    if (minIsNull) {
+      // Add the special null split at the end.
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+    }
+
+    return splits;
+  }
+
+  List<String> split(int numSplits, String minString, String maxString, String commonPrefix)
+      throws SQLException {
+
+    BigDecimal minVal = stringToBigDecimal(minString);
+    BigDecimal maxVal = stringToBigDecimal(maxString);
+
+    List<BigDecimal> splitPoints = split(new BigDecimal(numSplits), minVal, maxVal);
+    List<String> splitStrings = new ArrayList<String>();
+
+    // Convert the BigDecimal splitPoints into their string representations.
+    for (BigDecimal bd : splitPoints) {
+      splitStrings.add(commonPrefix + bigDecimalToString(bd));
+    }
+
+    // Make sure that our user-specified boundaries are the first and last entries
+    // in the array.
+    if (splitStrings.size() == 0 || !splitStrings.get(0).equals(commonPrefix + minString)) {
+      splitStrings.add(0, commonPrefix + minString);
+    }
+    if (splitStrings.size() == 1
+        || !splitStrings.get(splitStrings.size() - 1).equals(commonPrefix + maxString)) {
+      splitStrings.add(commonPrefix + maxString);
+    }
+
+    return splitStrings;
+  }
+
+  private final static BigDecimal ONE_PLACE = new BigDecimal(65536);
+
+  // Maximum number of characters to convert. This is to prevent rounding errors
+  // or repeating fractions near the very bottom from getting out of control. Note
+  // that this still gives us a huge number of possible splits.
+  private final static int MAX_CHARS = 8;
+
+  /**
+   * Return a BigDecimal representation of string 'str' suitable for use
+   * in a numerically-sorting order.
+   */
+  BigDecimal stringToBigDecimal(String str) {
+    BigDecimal result = BigDecimal.ZERO;
+    BigDecimal curPlace = ONE_PLACE; // start with 1/65536 to compute the first digit.
+
+    int len = Math.min(str.length(), MAX_CHARS);
+
+    for (int i = 0; i < len; i++) {
+      int codePoint = str.codePointAt(i);
+      result = result.add(tryDivide(new BigDecimal(codePoint), curPlace));
+      // advance to the next less significant place. e.g., 1/(65536^2) for the second char.
+      curPlace = curPlace.multiply(ONE_PLACE);
+    }
+
+    return result;
+  }
+
+  /**
+   * Return the string encoded in a BigDecimal.
+   * Repeatedly multiply the input value by 65536; the integer portion after such a multiplication
+   * represents a single character in base 65536. Convert that back into a char and create a
+   * string out of these until we have no data left.
+   */
+  String bigDecimalToString(BigDecimal bd) {
+    BigDecimal cur = bd.stripTrailingZeros();
+    StringBuilder sb = new StringBuilder();
+
+    for (int numConverted = 0; numConverted < MAX_CHARS; numConverted++) {
+      cur = cur.multiply(ONE_PLACE);
+      int curCodePoint = cur.intValue();
+      if (0 == curCodePoint) {
+        break;
+      }
+
+      cur = cur.subtract(new BigDecimal(curCodePoint));
+      sb.append(Character.toChars(curCodePoint));
+    }
+
+    return sb.toString();
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java?rev=814656&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java Mon Sep 14 14:21:51 2009
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+public class TestIntegerSplitter extends TestCase {
+  private long [] toLongArray(List<Long> in) {
+    long [] out = new long[in.size()];
+    for (int i = 0; i < in.size(); i++) {
+      out[i] = in.get(i).longValue();
+    }
+
+    return out;
+  }
+
+  public String formatLongArray(long [] ar) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    boolean first = true;
+    for (long val : ar) {
+      if (!first) {
+        sb.append(", ");
+      }
+
+      sb.append(Long.toString(val));
+      first = false;
+    }
+
+    sb.append("]");
+    return sb.toString();
+  }
+
+  public void assertLongArrayEquals(long [] expected, long [] actual) {
+    for (int i = 0; i < expected.length; i++) {
+      try {
+        assertEquals("Failure at position " + i + "; got " + actual[i]
+            + " instead of " + expected[i] + "; actual array is " + formatLongArray(actual),
+            expected[i], actual[i]);
+      } catch (ArrayIndexOutOfBoundsException oob) {
+        fail("Expected array with " + expected.length + " elements; got " + actual.length
+            + ". Actual array is " + formatLongArray(actual));
+      }
+    }
+
+    if (actual.length > expected.length) {
+      fail("Actual array has " + actual.length + " elements; expected " + expected.length
+          + ". ACtual array is " + formatLongArray(actual));
+    }
+  }
+
+  public void testEvenSplits() throws SQLException {
+    List<Long> splits = new IntegerSplitter().split(10, 0, 100);
+    long [] expected = { 0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100 };
+    assertLongArrayEquals(expected, toLongArray(splits));
+  }
+
+  public void testOddSplits() throws SQLException {
+    List<Long> splits = new IntegerSplitter().split(10, 0, 95);
+    long [] expected = { 0, 9, 18, 27, 36, 45, 54, 63, 72, 81, 90, 95 };
+    assertLongArrayEquals(expected, toLongArray(splits));
+
+  }
+
+  public void testSingletonSplit() throws SQLException {
+    List<Long> splits = new IntegerSplitter().split(1, 5, 5);
+    long [] expected = { 5, 5 };
+    assertLongArrayEquals(expected, toLongArray(splits));
+  }
+
+  public void testSingletonSplit2() throws SQLException {
+    // Same test, but overly-high numSplits
+    List<Long> splits = new IntegerSplitter().split(5, 5, 5);
+    long [] expected = { 5, 5 };
+    assertLongArrayEquals(expected, toLongArray(splits));
+  }
+
+  public void testTooManySplits() throws SQLException {
+    List<Long> splits = new IntegerSplitter().split(5, 3, 5);
+    long [] expected = { 3, 4, 5 };
+    assertLongArrayEquals(expected, toLongArray(splits));
+  }
+
+}
+

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java?rev=814656&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java Mon Sep 14 14:21:51 2009
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+public class TestTextSplitter extends TestCase {
+
+  public String formatArray(Object [] ar) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    boolean first = true;
+    for (Object val : ar) {
+      if (!first) {
+        sb.append(", ");
+      }
+
+      sb.append(val.toString());
+      first = false;
+    }
+
+    sb.append("]");
+    return sb.toString();
+  }
+
+  public void assertArrayEquals(Object [] expected, Object [] actual) {
+    for (int i = 0; i < expected.length; i++) {
+      try {
+        assertEquals("Failure at position " + i + "; got " + actual[i]
+            + " instead of " + expected[i] + "; actual array is " + formatArray(actual),
+            expected[i], actual[i]);
+      } catch (ArrayIndexOutOfBoundsException oob) {
+        fail("Expected array with " + expected.length + " elements; got " + actual.length
+            + ". Actual array is " + formatArray(actual));
+      }
+    }
+
+    if (actual.length > expected.length) {
+      fail("Actual array has " + actual.length + " elements; expected " + expected.length
+          + ". Actual array is " + formatArray(actual));
+    }
+  }
+
+  public void testStringConvertEmpty() {
+    TextSplitter splitter = new TextSplitter();
+    BigDecimal emptyBigDec = splitter.stringToBigDecimal("");
+    assertEquals(BigDecimal.ZERO, emptyBigDec);
+  }
+
+  public void testBigDecConvertEmpty() {
+    TextSplitter splitter = new TextSplitter();
+    String emptyStr = splitter.bigDecimalToString(BigDecimal.ZERO);
+    assertEquals("", emptyStr);
+  }
+
+  public void testConvertA() {
+    TextSplitter splitter = new TextSplitter();
+    String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("A"));
+    assertEquals("A", out);
+  }
+
+  public void testConvertZ() {
+    TextSplitter splitter = new TextSplitter();
+    String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("Z"));
+    assertEquals("Z", out);
+  }
+
+  public void testConvertThreeChars() {
+    TextSplitter splitter = new TextSplitter();
+    String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("abc"));
+    assertEquals("abc", out);
+  }
+
+  public void testConvertStr() {
+    TextSplitter splitter = new TextSplitter();
+    String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("big str"));
+    assertEquals("big str", out);
+  }
+
+  public void testConvertChomped() {
+    TextSplitter splitter = new TextSplitter();
+    String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("AVeryLongStringIndeed"));
+    assertEquals("AVeryLon", out);
+  }
+
+  public void testAlphabetSplit() throws SQLException {
+    // This should give us 25 splits, one per letter.
+    TextSplitter splitter = new TextSplitter();
+    List<String> splits = splitter.split(25, "A", "Z", "");
+    String [] expected = { "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K",
+        "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z" };
+    assertArrayEquals(expected, splits.toArray(new String [0]));
+  }
+
+  public void testCommonPrefix() throws SQLException {
+    // Splits between 'Hand' and 'Hardy'
+    TextSplitter splitter = new TextSplitter();
+    List<String> splits = splitter.split(5, "nd", "rdy", "Ha");
+    // Don't check for exact values in the middle, because the splitter generates some
+    // ugly Unicode-isms. But do check that we get multiple splits and that it starts
+    // and ends on the correct points.
+    assertEquals("Hand", splits.get(0));
+    assertEquals("Hardy", splits.get(splits.size() -1));
+    assertEquals(6, splits.size());
+  }
+}
+



Mime
View raw message