From mapreduce-commits-return-264-apmail-hadoop-mapreduce-commits-archive=hadoop.apache.org@hadoop.apache.org Mon Sep 14 14:22:17 2009 Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 14219 invoked from network); 14 Sep 2009 14:22:17 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 14 Sep 2009 14:22:17 -0000 Received: (qmail 82406 invoked by uid 500); 14 Sep 2009 14:22:17 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 82359 invoked by uid 500); 14 Sep 2009 14:22:17 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 82348 invoked by uid 99); 14 Sep 2009 14:22:17 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Sep 2009 14:22:17 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Sep 2009 14:22:12 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D1E622388867; Mon, 14 Sep 2009 14:21:52 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r814656 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapreduce/lib/db/ src/test/mapred/org/apache/hadoop/mapreduce/lib/db/ Date: Mon, 14 Sep 2009 14:21:52 -0000 To: mapreduce-commits@hadoop.apache.org From: enis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090914142152.D1E622388867@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: enis Date: Mon Sep 14 14:21:51 2009 New Revision: 814656 URL: http://svn.apache.org/viewvc?rev=814656&view=rev Log: MAPREDUCE-885. More efficient SQL queries for DBInputFormat. Contributed by Aaron Kimball. Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BigDecimalSplitter.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BooleanSplitter.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBSplitter.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=814656&r1=814655&r2=814656&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Mon Sep 14 14:21:51 2009 @@ -337,6 +337,9 @@ MAPREDUCE-856. Setup secure permissions for distributed cache files. (Vinod Kumar Vavilapalli via yhemanth) + MAPREDUCE-885. More efficient SQL queries for DBInputFormat. (Aaron Kimball + via enis) + BUG FIXES MAPREDUCE-878. Rename fair scheduler design doc to Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BigDecimalSplitter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BigDecimalSplitter.java?rev=814656&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BigDecimalSplitter.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BigDecimalSplitter.java Mon Sep 14 14:21:51 2009 @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.db; + +import java.math.BigDecimal; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * Implement DBSplitter over BigDecimal values. + */ +public class BigDecimalSplitter implements DBSplitter { + private static final Log LOG = LogFactory.getLog(BigDecimalSplitter.class); + + public List split(Configuration conf, ResultSet results, String colName) + throws SQLException { + + BigDecimal minVal = results.getBigDecimal(1); + BigDecimal maxVal = results.getBigDecimal(2); + + String lowClausePrefix = colName + " >= "; + String highClausePrefix = colName + " < "; + + BigDecimal numSplits = new BigDecimal(conf.getInt("mapred.map.tasks", 1)); + + if (minVal == null && maxVal == null) { + // Range is null to null. Return a null split accordingly. + List splits = new ArrayList(); + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + colName + " IS NULL", colName + " IS NULL")); + return splits; + } + + if (minVal == null || maxVal == null) { + // Don't know what is a reasonable min/max value for interpolation. Fail. + LOG.error("Cannot find a range for NUMERIC or DECIMAL fields with one end NULL."); + return null; + } + + // Get all the split points together. + List splitPoints = split(numSplits, minVal, maxVal); + List splits = new ArrayList(); + + // Turn the split points into a set of intervals. + BigDecimal start = splitPoints.get(0); + for (int i = 1; i < splitPoints.size(); i++) { + BigDecimal end = splitPoints.get(i); + + if (i == splitPoints.size() - 1) { + // This is the last one; use a closed interval. + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + lowClausePrefix + start.toString(), + colName + " <= " + end.toString())); + } else { + // Normal open-interval case. + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + lowClausePrefix + start.toString(), + highClausePrefix + end.toString())); + } + + start = end; + } + + return splits; + } + + private static final BigDecimal MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE); + + /** + * Divide numerator by denominator. If impossible in exact mode, use rounding. + */ + protected BigDecimal tryDivide(BigDecimal numerator, BigDecimal denominator) { + try { + return numerator.divide(denominator); + } catch (ArithmeticException ae) { + return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP); + } + } + + /** + * Returns a list of BigDecimals one element longer than the list of input splits. + * This represents the boundaries between input splits. + * All splits are open on the top end, except the last one. + * + * So the list [0, 5, 8, 12, 18] would represent splits capturing the intervals: + * + * [0, 5) + * [5, 8) + * [8, 12) + * [12, 18] note the closed interval for the last split. + */ + List split(BigDecimal numSplits, BigDecimal minVal, BigDecimal maxVal) + throws SQLException { + + List splits = new ArrayList(); + + // Use numSplits as a hint. May need an extra task if the size doesn't + // divide cleanly. + + BigDecimal splitSize = tryDivide(maxVal.subtract(minVal), (numSplits)); + if (splitSize.compareTo(MIN_INCREMENT) < 0) { + splitSize = MIN_INCREMENT; + LOG.warn("Set BigDecimal splitSize to MIN_INCREMENT"); + } + + BigDecimal curVal = minVal; + + while (curVal.compareTo(maxVal) <= 0) { + splits.add(curVal); + curVal = curVal.add(splitSize); + } + + if (splits.get(splits.size() - 1).compareTo(maxVal) != 0 || splits.size() == 1) { + // We didn't end on the maxVal. Add that to the end of the list. + splits.add(maxVal); + } + + return splits; + } +} Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BooleanSplitter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BooleanSplitter.java?rev=814656&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BooleanSplitter.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/BooleanSplitter.java Mon Sep 14 14:21:51 2009 @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.db; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * Implement DBSplitter over boolean values. + */ +public class BooleanSplitter implements DBSplitter { + public List split(Configuration conf, ResultSet results, String colName) + throws SQLException { + + List splits = new ArrayList(); + + if (results.getString(1) == null && results.getString(2) == null) { + // Range is null to null. Return a null split accordingly. + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + colName + " IS NULL", colName + " IS NULL")); + return splits; + } + + boolean minVal = results.getBoolean(1); + boolean maxVal = results.getBoolean(2); + + // Use one or two splits. + if (!minVal) { + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + colName + " = FALSE", colName + " = FALSE")); + } + + if (maxVal) { + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + colName + " = TRUE", colName + " = TRUE")); + } + + if (results.getString(1) == null || results.getString(2) == null) { + // Include a null value. + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + colName + " IS NULL", colName + " IS NULL")); + } + + return splits; + } +} Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java?rev=814656&r1=814655&r2=814656&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java Mon Sep 14 14:21:51 2009 @@ -77,6 +77,10 @@ public static final String INPUT_COUNT_QUERY = "mapred.jdbc.input.count.query"; + /** Input query to get the max and min values of the jdbc.input.query */ + public static final String INPUT_BOUNDING_QUERY = + "mapred.jdbc.input.bounding.query"; + /** Class name implementing DBWritable which will hold input tuples */ public static final String INPUT_CLASS_PROPERTY = "mapred.jdbc.input.class"; @@ -207,7 +211,17 @@ conf.set(DBConfiguration.INPUT_COUNT_QUERY, query); } } - + + public void setInputBoundingQuery(String query) { + if (query != null && query.length() > 0) { + conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query); + } + } + + public String getInputBoundingQuery() { + return conf.get(DBConfiguration.INPUT_BOUNDING_QUERY); + } + public Class getInputClass() { return conf.getClass(DBConfiguration.INPUT_CLASS_PROPERTY, NullDBWritable.class); Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java?rev=814656&r1=814655&r2=814656&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java Mon Sep 14 14:21:51 2009 @@ -172,7 +172,15 @@ public DBConfiguration getDBConf() { return dbConf; } - + + public Connection getConnection() { + return connection; + } + + public String getDBProductName() { + return dbProductName; + } + protected RecordReader createDBRecordReader(DBInputSplit split, Configuration conf) throws IOException { Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java?rev=814656&r1=814655&r2=814656&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java Mon Sep 14 14:21:51 2009 @@ -30,6 +30,8 @@ import java.util.ArrayList; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; @@ -49,6 +51,9 @@ */ public class DBRecordReader extends RecordReader { + + private static final Log LOG = LogFactory.getLog(DBRecordReader.class); + private ResultSet results = null; private Class inputClass; @@ -102,7 +107,7 @@ /** Returns the query for selecting the records, * subclasses can override this for custom behaviour.*/ protected String getSelectQuery() { - StringBuilder query = new StringBuilder(); + StringBuilder query = new StringBuilder(); // Default codepath for MySQL, HSQLDB, etc. Relies on LIMIT/OFFSET for splits. if(dbConf.getInputQuery() == null) { @@ -254,4 +259,12 @@ protected Connection getConnection() { return connection; } + + protected PreparedStatement getStatement() { + return statement; + } + + protected void setStatement(PreparedStatement stmt) { + this.statement = stmt; + } } Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBSplitter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBSplitter.java?rev=814656&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBSplitter.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBSplitter.java Mon Sep 14 14:21:51 2009 @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.db; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * DBSplitter will generate DBInputSplits to use with DataDrivenDBInputFormat. + * DataDrivenDBInputFormat needs to interpolate between two values that + * represent the lowest and highest valued records to import. Depending + * on the data-type of the column, this requires different behavior. + * DBSplitter implementations should perform this for a data type or family + * of data types. + */ +public interface DBSplitter { + /** + * Given a ResultSet containing one record (and already advanced to that record) + * with two columns (a low value, and a high value, both of the same type), determine + * a set of splits that span the given values. + */ + List split(Configuration conf, ResultSet results, String colName) throws SQLException; +} Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java?rev=814656&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java Mon Sep 14 14:21:51 2009 @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.db; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; + +/** + * A InputFormat that reads input data from an SQL table. + * Operates like DBInputFormat, but instead of using LIMIT and OFFSET to demarcate + * splits, it tries to generate WHERE clauses which separate the data into roughly + * equivalent shards. + */ +public class DataDrivenDBInputFormat + extends DBInputFormat implements Configurable { + + private static final Log LOG = LogFactory.getLog(DataDrivenDBInputFormat.class); + + /** If users are providing their own query, the following string is expected to + appear in the WHERE clause, which will be substituted with a pair of conditions + on the input to allow input splits to parallelise the import. */ + public static final String SUBSTITUTE_TOKEN = "$CONDITIONS"; + + /** + * A InputSplit that spans a set of rows + */ + public static class DataDrivenDBInputSplit extends DBInputFormat.DBInputSplit { + + private String lowerBoundClause; + private String upperBoundClause; + + /** + * Default Constructor + */ + public DataDrivenDBInputSplit() { + } + + /** + * Convenience Constructor + * @param lower the string to be put in the WHERE clause to guard on the 'lower' end + * @param upper the string to be put in the WHERE clause to guard on the 'upper' end + */ + public DataDrivenDBInputSplit(final String lower, final String upper) { + this.lowerBoundClause = lower; + this.upperBoundClause = upper; + } + + + /** + * @return The total row count in this split + */ + public long getLength() throws IOException { + return 0; // unfortunately, we don't know this. + } + + /** {@inheritDoc} */ + public void readFields(DataInput input) throws IOException { + this.lowerBoundClause = Text.readString(input); + this.upperBoundClause = Text.readString(input); + } + + /** {@inheritDoc} */ + public void write(DataOutput output) throws IOException { + Text.writeString(output, this.lowerBoundClause); + Text.writeString(output, this.upperBoundClause); + } + + public String getLowerClause() { + return lowerBoundClause; + } + + public String getUpperClause() { + return upperBoundClause; + } + } + + /** + * @return the DBSplitter implementation to use to divide the table/query into InputSplits. + */ + protected DBSplitter getSplitter(int sqlDataType) { + switch (sqlDataType) { + case Types.NUMERIC: + case Types.DECIMAL: + return new BigDecimalSplitter(); + + case Types.BIT: + case Types.BOOLEAN: + return new BooleanSplitter(); + + case Types.INTEGER: + case Types.TINYINT: + case Types.SMALLINT: + case Types.BIGINT: + return new IntegerSplitter(); + + case Types.REAL: + case Types.FLOAT: + case Types.DOUBLE: + return new FloatSplitter(); + + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGVARCHAR: + return new TextSplitter(); + + case Types.DATE: + case Types.TIME: + case Types.TIMESTAMP: + return new DateSplitter(); + + default: + // TODO: Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB, BLOB, ARRAY + // STRUCT, REF, DATALINK, and JAVA_OBJECT. + return null; + } + } + + /** {@inheritDoc} */ + public List getSplits(JobContext job) throws IOException { + + ResultSet results = null; + Statement statement = null; + try { + statement = getConnection().createStatement(); + + results = statement.executeQuery(getBoundingValsQuery()); + results.next(); + + // Based on the type of the results, use a different mechanism + // for interpolating split points (i.e., numeric splits, text splits, + // dates, etc.) + int sqlDataType = results.getMetaData().getColumnType(1); + DBSplitter splitter = getSplitter(sqlDataType); + if (null == splitter) { + throw new IOException("Unknown SQL data type: " + sqlDataType); + } + + return splitter.split(job.getConfiguration(), results, getDBConf().getInputOrderBy()); + } catch (SQLException e) { + throw new IOException(e.getMessage()); + } finally { + // More-or-less ignore SQL exceptions here, but log in case we need it. + try { + if (null != results) { + results.close(); + } + } catch (SQLException se) { + LOG.debug("SQLException closing resultset: " + se.toString()); + } + + try { + if (null != statement) { + statement.close(); + } + } catch (SQLException se) { + LOG.debug("SQLException closing statement: " + se.toString()); + } + + try { + getConnection().commit(); + } catch (SQLException se) { + LOG.debug("SQLException committing split transaction: " + se.toString()); + } + } + } + + /** + * @return a query which returns the minimum and maximum values for + * the order-by column. + * + * The min value should be in the first column, and the + * max value should be in the second column of the results. + */ + protected String getBoundingValsQuery() { + // If the user has provided a query, use that instead. + String userQuery = getDBConf().getInputBoundingQuery(); + if (null != userQuery) { + return userQuery; + } + + // Auto-generate one based on the table name we've been provided with. + StringBuilder query = new StringBuilder(); + + String splitCol = getDBConf().getInputOrderBy(); + query.append("SELECT MIN(").append(splitCol).append("), "); + query.append("MAX(").append(splitCol).append(") FROM "); + query.append(getDBConf().getInputTableName()); + String conditions = getDBConf().getInputConditions(); + if (null != conditions) { + query.append(" WHERE ( " + conditions + " )"); + } + + return query.toString(); + } + + /** Set the user-defined bounding query to use with a user-defined query. + This *must* include the substring "$CONDITIONS" + (DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) inside the WHERE clause, + so that DataDrivenDBInputFormat knows where to insert split clauses. + e.g., "SELECT foo FROM mytable WHERE $CONDITIONS" + This will be expanded to something like: + SELECT foo FROM mytable WHERE (id > 100) AND (id < 250) + inside each split. + */ + public static void setBoundingQuery(Configuration conf, String query) { + if (null != query) { + // If the user's settng a query, warn if they don't allow conditions. + if (query.indexOf(SUBSTITUTE_TOKEN) == -1) { + LOG.warn("Could not find " + SUBSTITUTE_TOKEN + " token in query: " + query + + "; splits may not partition data."); + } + } + + conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query); + } + + protected RecordReader createDBRecordReader(DBInputSplit split, + Configuration conf) throws IOException { + + DBConfiguration dbConf = getDBConf(); + @SuppressWarnings("unchecked") + Class inputClass = (Class) (dbConf.getInputClass()); + String dbProductName = getDBProductName(); + + try { + // use database product name to determine appropriate record reader. + if (dbProductName.startsWith("MYSQL")) { + // use MySQL-specific db reader. + return new MySQLDataDrivenDBRecordReader(split, inputClass, + conf, getConnection(), dbConf, dbConf.getInputConditions(), dbConf.getInputFieldNames(), + dbConf.getInputTableName()); + } else { + // Generic reader. + return new DataDrivenDBRecordReader(split, inputClass, + conf, getConnection(), dbConf, dbConf.getInputConditions(), dbConf.getInputFieldNames(), + dbConf.getInputTableName()); + } + } catch (SQLException ex) { + throw new IOException(ex.getMessage()); + } + } + + // Configuration methods override superclass to ensure that the proper + // DataDrivenDBInputFormat gets used. + + /** Note that the "orderBy" column is called the "splitBy" in this version. + * We reuse the same field, but it's not strictly ordering it -- just partitioning + * the results. + */ + public static void setInput(Job job, + Class inputClass, + String tableName,String conditions, + String splitBy, String... fieldNames) { + DBInputFormat.setInput(job, inputClass, tableName, conditions, splitBy, fieldNames); + job.setInputFormatClass(DataDrivenDBInputFormat.class); + } + + /** setInput() takes a custom query and a separate "bounding query" to use + instead of the custom "count query" used by DBInputFormat. + */ + public static void setInput(Job job, + Class inputClass, + String inputQuery, String inputBoundingQuery) { + DBInputFormat.setInput(job, inputClass, inputQuery, ""); + job.getConfiguration().set(DBConfiguration.INPUT_BOUNDING_QUERY, inputBoundingQuery); + job.setInputFormatClass(DataDrivenDBInputFormat.class); + } +} Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java?rev=814656&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBRecordReader.java Mon Sep 14 14:21:51 2009 @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.db; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; + +/** + * A RecordReader that reads records from a SQL table, + * using data-driven WHERE clause splits. + * Emits LongWritables containing the record number as + * key and DBWritables as value. + */ +public class DataDrivenDBRecordReader extends DBRecordReader { + + private static final Log LOG = LogFactory.getLog(DataDrivenDBRecordReader.class); + + /** + * @param split The InputSplit to read data for + * @throws SQLException + */ + public DataDrivenDBRecordReader(DBInputFormat.DBInputSplit split, + Class inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig, + String cond, String [] fields, String table) + throws SQLException { + super(split, inputClass, conf, conn, dbConfig, cond, fields, table); + } + + /** Returns the query for selecting the records, + * subclasses can override this for custom behaviour.*/ + @SuppressWarnings("unchecked") + protected String getSelectQuery() { + StringBuilder query = new StringBuilder(); + DataDrivenDBInputFormat.DataDrivenDBInputSplit dataSplit = + (DataDrivenDBInputFormat.DataDrivenDBInputSplit) getSplit(); + DBConfiguration dbConf = getDBConf(); + String [] fieldNames = getFieldNames(); + String tableName = getTableName(); + String conditions = getConditions(); + + // Build the WHERE clauses associated with the data split first. + // We need them in both branches of this function. + StringBuilder conditionClauses = new StringBuilder(); + conditionClauses.append("( ").append(dataSplit.getLowerClause()); + conditionClauses.append(" ) AND ( ").append(dataSplit.getUpperClause()); + conditionClauses.append(" )"); + + if(dbConf.getInputQuery() == null) { + // We need to generate the entire query. + query.append("SELECT "); + + for (int i = 0; i < fieldNames.length; i++) { + query.append(fieldNames[i]); + if (i != fieldNames.length -1) { + query.append(", "); + } + } + + query.append(" FROM ").append(tableName); + query.append(" AS ").append(tableName); //in hsqldb this is necessary + query.append(" WHERE "); + if (conditions != null && conditions.length() > 0) { + // Put the user's conditions first. + query.append("( ").append(conditions).append(" ) AND "); + } + + // Now append the conditions associated with our split. + query.append(conditionClauses.toString()); + + } else { + // User provided the query. We replace the special token with our WHERE clause. + String inputQuery = dbConf.getInputQuery(); + if (inputQuery.indexOf(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) == -1) { + LOG.error("Could not find the clause substitution token " + + DataDrivenDBInputFormat.SUBSTITUTE_TOKEN + " in the query: [" + + inputQuery + "]. Parallel splits may not work correctly."); + } + + query.append(inputQuery.replace(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN, + conditionClauses.toString())); + } + + return query.toString(); + } +} Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java?rev=814656&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DateSplitter.java Mon Sep 14 14:21:51 2009 @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.db; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * Implement DBSplitter over date/time values. + * Make use of logic from IntegerSplitter, since date/time are just longs + * in Java. + */ +public class DateSplitter extends IntegerSplitter { + + private static final Log LOG = LogFactory.getLog(DateSplitter.class); + + public List split(Configuration conf, ResultSet results, String colName) + throws SQLException { + + long minVal; + long maxVal; + + int sqlDataType = results.getMetaData().getColumnType(1); + minVal = resultSetColToLong(results, 1, sqlDataType); + maxVal = resultSetColToLong(results, 2, sqlDataType); + + String lowClausePrefix = colName + " >= '"; + String highClausePrefix = colName + " < '"; + + int numSplits = conf.getInt("mapred.map.tasks", 1); + if (numSplits < 1) { + numSplits = 1; + } + + if (minVal == Long.MIN_VALUE && maxVal == Long.MIN_VALUE) { + // The range of acceptable dates is NULL to NULL. Just create a single split. + List splits = new ArrayList(); + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + colName + " IS NULL", colName + " IS NULL")); + return splits; + } + + // Gather the split point integers + List splitPoints = split(numSplits, minVal, maxVal); + List splits = new ArrayList(); + + // Turn the split points into a set of intervals. + long start = splitPoints.get(0); + Date startDate = longToDate(start, sqlDataType); + if (sqlDataType == Types.TIMESTAMP) { + // The lower bound's nanos value needs to match the actual lower-bound nanos. + try { + ((java.sql.Timestamp) startDate).setNanos(results.getTimestamp(1).getNanos()); + } catch (NullPointerException npe) { + // If the lower bound was NULL, we'll get an NPE; just ignore it and don't set nanos. + } + } + + for (int i = 1; i < splitPoints.size(); i++) { + long end = splitPoints.get(i); + Date endDate = longToDate(end, sqlDataType); + + if (i == splitPoints.size() - 1) { + if (sqlDataType == Types.TIMESTAMP) { + // The upper bound's nanos value needs to match the actual upper-bound nanos. + try { + ((java.sql.Timestamp) endDate).setNanos(results.getTimestamp(2).getNanos()); + } catch (NullPointerException npe) { + // If the upper bound was NULL, we'll get an NPE; just ignore it and don't set nanos. + } + } + // This is the last one; use a closed interval. + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + lowClausePrefix + startDate.toString() + "'", + colName + " <= '" + endDate.toString() + "'")); + } else { + // Normal open-interval case. + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + lowClausePrefix + startDate.toString() + "'", + highClausePrefix + endDate.toString() + "'")); + } + + start = end; + startDate = endDate; + } + + if (minVal == Long.MIN_VALUE || maxVal == Long.MIN_VALUE) { + // Add an extra split to handle the null case that we saw. + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + colName + " IS NULL", colName + " IS NULL")); + } + + return splits; + } + + /** Retrieve the value from the column in a type-appropriate manner and return + its timestamp since the epoch. If the column is null, then return Long.MIN_VALUE. + This will cause a special split to be generated for the NULL case, but may also + cause poorly-balanced splits if most of the actual dates are positive time + since the epoch, etc. + */ + private long resultSetColToLong(ResultSet rs, int colNum, int sqlDataType) throws SQLException { + try { + switch (sqlDataType) { + case Types.DATE: + return rs.getDate(1).getTime(); + case Types.TIME: + return rs.getTime(1).getTime(); + case Types.TIMESTAMP: + return rs.getTimestamp(1).getTime(); + default: + throw new SQLException("Not a date-type field"); + } + } catch (NullPointerException npe) { + // null column. return minimum long value. + LOG.warn("Encountered a NULL date in the split column. Splits may be poorly balanced."); + return Long.MIN_VALUE; + } + } + + /** Parse the long-valued timestamp into the appropriate SQL date type. */ + private Date longToDate(long val, int sqlDataType) { + switch (sqlDataType) { + case Types.DATE: + return new java.sql.Date(val); + case Types.TIME: + return new java.sql.Time(val); + case Types.TIMESTAMP: + return new java.sql.Timestamp(val); + default: // Shouldn't ever hit this case. + return null; + } + } +} Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java?rev=814656&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/FloatSplitter.java Mon Sep 14 14:21:51 2009 @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.db; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * Implement DBSplitter over floating-point values. + */ +public class FloatSplitter implements DBSplitter { + + private static final double MIN_INCREMENT = 10000 * Double.MIN_VALUE; + + public List split(Configuration conf, ResultSet results, String colName) + throws SQLException { + + List splits = new ArrayList(); + + if (results.getString(1) == null && results.getString(2) == null) { + // Range is null to null. Return a null split accordingly. + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + colName + " IS NULL", colName + " IS NULL")); + return splits; + } + + double minVal = results.getDouble(1); + double maxVal = results.getDouble(2); + + // Use this as a hint. May need an extra task if the size doesn't + // divide cleanly. + int numSplits = conf.getInt("mapred.map.tasks", 1); + double splitSize = (maxVal - minVal) / (double) numSplits; + + if (splitSize < MIN_INCREMENT) { + splitSize = MIN_INCREMENT; + } + + String lowClausePrefix = colName + " >= "; + String highClausePrefix = colName + " < "; + + double curLower = minVal; + double curUpper = curLower + splitSize; + + while (curUpper < maxVal) { + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + lowClausePrefix + Double.toString(curLower), + highClausePrefix + Double.toString(curUpper))); + + curLower = curUpper; + curUpper += splitSize; + } + + // Catch any overage and create the closed interval for the last split. + if (curLower <= maxVal || splits.size() == 1) { + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + lowClausePrefix + Double.toString(curUpper), + colName + " <= " + Double.toString(maxVal))); + } + + if (results.getString(1) == null || results.getString(2) == null) { + // At least one extrema is null; add a null split. + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + colName + " IS NULL", colName + " IS NULL")); + } + + return splits; + } +} Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java?rev=814656&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/IntegerSplitter.java Mon Sep 14 14:21:51 2009 @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.db; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * Implement DBSplitter over integer values. + */ +public class IntegerSplitter implements DBSplitter { + public List split(Configuration conf, ResultSet results, String colName) + throws SQLException { + + long minVal = results.getLong(1); + long maxVal = results.getLong(2); + + String lowClausePrefix = colName + " >= "; + String highClausePrefix = colName + " < "; + + int numSplits = conf.getInt("mapred.map.tasks", 1); + if (numSplits < 1) { + numSplits = 1; + } + + if (results.getString(1) == null && results.getString(2) == null) { + // Range is null to null. Return a null split accordingly. + List splits = new ArrayList(); + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + colName + " IS NULL", colName + " IS NULL")); + return splits; + } + + // Get all the split points together. + List splitPoints = split(numSplits, minVal, maxVal); + List splits = new ArrayList(); + + // Turn the split points into a set of intervals. + long start = splitPoints.get(0); + for (int i = 1; i < splitPoints.size(); i++) { + long end = splitPoints.get(i); + + if (i == splitPoints.size() - 1) { + // This is the last one; use a closed interval. + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + lowClausePrefix + Long.toString(start), + colName + " <= " + Long.toString(end))); + } else { + // Normal open-interval case. + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + lowClausePrefix + Long.toString(start), + highClausePrefix + Long.toString(end))); + } + + start = end; + } + + if (results.getString(1) == null || results.getString(2) == null) { + // At least one extrema is null; add a null split. + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + colName + " IS NULL", colName + " IS NULL")); + } + + return splits; + } + + /** + * Returns a list of longs one element longer than the list of input splits. + * This represents the boundaries between input splits. + * All splits are open on the top end, except the last one. + * + * So the list [0, 5, 8, 12, 18] would represent splits capturing the intervals: + * + * [0, 5) + * [5, 8) + * [8, 12) + * [12, 18] note the closed interval for the last split. + */ + List split(long numSplits, long minVal, long maxVal) + throws SQLException { + + List splits = new ArrayList(); + + // Use numSplits as a hint. May need an extra task if the size doesn't + // divide cleanly. + + long splitSize = (maxVal - minVal) / numSplits; + if (splitSize < 1) { + splitSize = 1; + } + + long curVal = minVal; + + while (curVal <= maxVal) { + splits.add(curVal); + curVal += splitSize; + } + + if (splits.get(splits.size() - 1) != maxVal || splits.size() == 1) { + // We didn't end on the maxVal. Add that to the end of the list. + splits.add(maxVal); + } + + return splits; + } +} Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java?rev=814656&r1=814655&r2=814656&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java Mon Sep 14 14:21:51 2009 @@ -41,6 +41,7 @@ PreparedStatement statement = getConnection().prepareStatement(query, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time. + setStatement(statement); // save a ref for cleanup in close() return statement.executeQuery(); } } Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java?rev=814656&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java Mon Sep 14 14:21:51 2009 @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.db; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +import org.apache.hadoop.conf.Configuration; + +/** + * A RecordReader that reads records from a MySQL table via DataDrivenDBRecordReader + */ +public class MySQLDataDrivenDBRecordReader + extends DataDrivenDBRecordReader { + + public MySQLDataDrivenDBRecordReader(DBInputFormat.DBInputSplit split, + Class inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig, + String cond, String [] fields, String table) throws SQLException { + super(split, inputClass, conf, conn, dbConfig, cond, fields, table); + } + + // Execute statements for mysql in unbuffered mode. + protected ResultSet executeQuery(String query) throws SQLException { + PreparedStatement statement = getConnection().prepareStatement(query, + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time. + setStatement(statement); // save a ref so the close() method cleans this up. + return statement.executeQuery(); + } +} Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java?rev=814656&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java Mon Sep 14 14:21:51 2009 @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.db; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * Implement DBSplitter over text strings. + */ +public class TextSplitter extends BigDecimalSplitter { + + private static final Log LOG = LogFactory.getLog(TextSplitter.class); + + /** + * This method needs to determine the splits between two user-provided strings. + * In the case where the user's strings are 'A' and 'Z', this is not hard; we + * could create two splits from ['A', 'M') and ['M', 'Z'], 26 splits for strings + * beginning with each letter, etc. + * + * If a user has provided us with the strings "Ham" and "Haze", however, we need + * to create splits that differ in the third letter. + * + * The algorithm used is as follows: + * Since there are 2**16 unicode characters, we interpret characters as digits in + * base 65536. Given a string 's' containing characters s_0, s_1 .. s_n, we interpret + * the string as the number: 0.s_0 s_1 s_2.. s_n in base 65536. Having mapped the + * low and high strings into floating-point values, we then use the BigDecimalSplitter + * to establish the even split points, then map the resulting floating point values + * back into strings. + */ + public List split(Configuration conf, ResultSet results, String colName) + throws SQLException { + + LOG.warn("Generating splits for a textual index column."); + LOG.warn("If your database sorts in a case-insensitive order, " + + "this may result in a partial import or duplicate records."); + LOG.warn("You are strongly encouraged to choose a numeric split column."); + + String minString = results.getString(1); + String maxString = results.getString(2); + + boolean minIsNull = false; + + // If the min value is null, switch it to an empty string instead for purposes + // of interpolation. Then add [null, null] as a special case split. + if (null == minString) { + minString = ""; + minIsNull = true; + } + + if (null == maxString) { + // If the max string is null, then the min string has to be null too. + // Just return a special split for this case. + List splits = new ArrayList(); + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + colName + " IS NULL", colName + " IS NULL")); + return splits; + } + + // Use this as a hint. May need an extra task if the size doesn't + // divide cleanly. + int numSplits = conf.getInt("mapred.map.tasks", 1); + + String lowClausePrefix = colName + " >= '"; + String highClausePrefix = colName + " < '"; + + // If there is a common prefix between minString and maxString, establish it + // and pull it out of minString and maxString. + int maxPrefixLen = Math.min(minString.length(), maxString.length()); + int sharedLen; + for (sharedLen = 0; sharedLen < maxPrefixLen; sharedLen++) { + char c1 = minString.charAt(sharedLen); + char c2 = maxString.charAt(sharedLen); + if (c1 != c2) { + break; + } + } + + // The common prefix has length 'sharedLen'. Extract it from both. + String commonPrefix = minString.substring(0, sharedLen); + minString = minString.substring(sharedLen); + maxString = maxString.substring(sharedLen); + + List splitStrings = split(numSplits, minString, maxString, commonPrefix); + List splits = new ArrayList(); + + // Convert the list of split point strings into an actual set of InputSplits. + String start = splitStrings.get(0); + for (int i = 1; i < splitStrings.size(); i++) { + String end = splitStrings.get(i); + + if (i == splitStrings.size() - 1) { + // This is the last one; use a closed interval. + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + lowClausePrefix + start + "'", colName + " <= '" + end + "'")); + } else { + // Normal open-interval case. + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + lowClausePrefix + start + "'", highClausePrefix + end + "'")); + } + } + + if (minIsNull) { + // Add the special null split at the end. + splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( + colName + " IS NULL", colName + " IS NULL")); + } + + return splits; + } + + List split(int numSplits, String minString, String maxString, String commonPrefix) + throws SQLException { + + BigDecimal minVal = stringToBigDecimal(minString); + BigDecimal maxVal = stringToBigDecimal(maxString); + + List splitPoints = split(new BigDecimal(numSplits), minVal, maxVal); + List splitStrings = new ArrayList(); + + // Convert the BigDecimal splitPoints into their string representations. + for (BigDecimal bd : splitPoints) { + splitStrings.add(commonPrefix + bigDecimalToString(bd)); + } + + // Make sure that our user-specified boundaries are the first and last entries + // in the array. + if (splitStrings.size() == 0 || !splitStrings.get(0).equals(commonPrefix + minString)) { + splitStrings.add(0, commonPrefix + minString); + } + if (splitStrings.size() == 1 + || !splitStrings.get(splitStrings.size() - 1).equals(commonPrefix + maxString)) { + splitStrings.add(commonPrefix + maxString); + } + + return splitStrings; + } + + private final static BigDecimal ONE_PLACE = new BigDecimal(65536); + + // Maximum number of characters to convert. This is to prevent rounding errors + // or repeating fractions near the very bottom from getting out of control. Note + // that this still gives us a huge number of possible splits. + private final static int MAX_CHARS = 8; + + /** + * Return a BigDecimal representation of string 'str' suitable for use + * in a numerically-sorting order. + */ + BigDecimal stringToBigDecimal(String str) { + BigDecimal result = BigDecimal.ZERO; + BigDecimal curPlace = ONE_PLACE; // start with 1/65536 to compute the first digit. + + int len = Math.min(str.length(), MAX_CHARS); + + for (int i = 0; i < len; i++) { + int codePoint = str.codePointAt(i); + result = result.add(tryDivide(new BigDecimal(codePoint), curPlace)); + // advance to the next less significant place. e.g., 1/(65536^2) for the second char. + curPlace = curPlace.multiply(ONE_PLACE); + } + + return result; + } + + /** + * Return the string encoded in a BigDecimal. + * Repeatedly multiply the input value by 65536; the integer portion after such a multiplication + * represents a single character in base 65536. Convert that back into a char and create a + * string out of these until we have no data left. + */ + String bigDecimalToString(BigDecimal bd) { + BigDecimal cur = bd.stripTrailingZeros(); + StringBuilder sb = new StringBuilder(); + + for (int numConverted = 0; numConverted < MAX_CHARS; numConverted++) { + cur = cur.multiply(ONE_PLACE); + int curCodePoint = cur.intValue(); + if (0 == curCodePoint) { + break; + } + + cur = cur.subtract(new BigDecimal(curCodePoint)); + sb.append(Character.toChars(curCodePoint)); + } + + return sb.toString(); + } +} Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java?rev=814656&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java Mon Sep 14 14:21:51 2009 @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.lib.db; + +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import junit.framework.TestCase; + +public class TestIntegerSplitter extends TestCase { + private long [] toLongArray(List in) { + long [] out = new long[in.size()]; + for (int i = 0; i < in.size(); i++) { + out[i] = in.get(i).longValue(); + } + + return out; + } + + public String formatLongArray(long [] ar) { + StringBuilder sb = new StringBuilder(); + sb.append("["); + boolean first = true; + for (long val : ar) { + if (!first) { + sb.append(", "); + } + + sb.append(Long.toString(val)); + first = false; + } + + sb.append("]"); + return sb.toString(); + } + + public void assertLongArrayEquals(long [] expected, long [] actual) { + for (int i = 0; i < expected.length; i++) { + try { + assertEquals("Failure at position " + i + "; got " + actual[i] + + " instead of " + expected[i] + "; actual array is " + formatLongArray(actual), + expected[i], actual[i]); + } catch (ArrayIndexOutOfBoundsException oob) { + fail("Expected array with " + expected.length + " elements; got " + actual.length + + ". Actual array is " + formatLongArray(actual)); + } + } + + if (actual.length > expected.length) { + fail("Actual array has " + actual.length + " elements; expected " + expected.length + + ". ACtual array is " + formatLongArray(actual)); + } + } + + public void testEvenSplits() throws SQLException { + List splits = new IntegerSplitter().split(10, 0, 100); + long [] expected = { 0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100 }; + assertLongArrayEquals(expected, toLongArray(splits)); + } + + public void testOddSplits() throws SQLException { + List splits = new IntegerSplitter().split(10, 0, 95); + long [] expected = { 0, 9, 18, 27, 36, 45, 54, 63, 72, 81, 90, 95 }; + assertLongArrayEquals(expected, toLongArray(splits)); + + } + + public void testSingletonSplit() throws SQLException { + List splits = new IntegerSplitter().split(1, 5, 5); + long [] expected = { 5, 5 }; + assertLongArrayEquals(expected, toLongArray(splits)); + } + + public void testSingletonSplit2() throws SQLException { + // Same test, but overly-high numSplits + List splits = new IntegerSplitter().split(5, 5, 5); + long [] expected = { 5, 5 }; + assertLongArrayEquals(expected, toLongArray(splits)); + } + + public void testTooManySplits() throws SQLException { + List splits = new IntegerSplitter().split(5, 3, 5); + long [] expected = { 3, 4, 5 }; + assertLongArrayEquals(expected, toLongArray(splits)); + } + +} + Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java?rev=814656&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java Mon Sep 14 14:21:51 2009 @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.lib.db; + +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import junit.framework.TestCase; + +public class TestTextSplitter extends TestCase { + + public String formatArray(Object [] ar) { + StringBuilder sb = new StringBuilder(); + sb.append("["); + boolean first = true; + for (Object val : ar) { + if (!first) { + sb.append(", "); + } + + sb.append(val.toString()); + first = false; + } + + sb.append("]"); + return sb.toString(); + } + + public void assertArrayEquals(Object [] expected, Object [] actual) { + for (int i = 0; i < expected.length; i++) { + try { + assertEquals("Failure at position " + i + "; got " + actual[i] + + " instead of " + expected[i] + "; actual array is " + formatArray(actual), + expected[i], actual[i]); + } catch (ArrayIndexOutOfBoundsException oob) { + fail("Expected array with " + expected.length + " elements; got " + actual.length + + ". Actual array is " + formatArray(actual)); + } + } + + if (actual.length > expected.length) { + fail("Actual array has " + actual.length + " elements; expected " + expected.length + + ". Actual array is " + formatArray(actual)); + } + } + + public void testStringConvertEmpty() { + TextSplitter splitter = new TextSplitter(); + BigDecimal emptyBigDec = splitter.stringToBigDecimal(""); + assertEquals(BigDecimal.ZERO, emptyBigDec); + } + + public void testBigDecConvertEmpty() { + TextSplitter splitter = new TextSplitter(); + String emptyStr = splitter.bigDecimalToString(BigDecimal.ZERO); + assertEquals("", emptyStr); + } + + public void testConvertA() { + TextSplitter splitter = new TextSplitter(); + String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("A")); + assertEquals("A", out); + } + + public void testConvertZ() { + TextSplitter splitter = new TextSplitter(); + String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("Z")); + assertEquals("Z", out); + } + + public void testConvertThreeChars() { + TextSplitter splitter = new TextSplitter(); + String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("abc")); + assertEquals("abc", out); + } + + public void testConvertStr() { + TextSplitter splitter = new TextSplitter(); + String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("big str")); + assertEquals("big str", out); + } + + public void testConvertChomped() { + TextSplitter splitter = new TextSplitter(); + String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("AVeryLongStringIndeed")); + assertEquals("AVeryLon", out); + } + + public void testAlphabetSplit() throws SQLException { + // This should give us 25 splits, one per letter. + TextSplitter splitter = new TextSplitter(); + List splits = splitter.split(25, "A", "Z", ""); + String [] expected = { "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", + "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z" }; + assertArrayEquals(expected, splits.toArray(new String [0])); + } + + public void testCommonPrefix() throws SQLException { + // Splits between 'Hand' and 'Hardy' + TextSplitter splitter = new TextSplitter(); + List splits = splitter.split(5, "nd", "rdy", "Ha"); + // Don't check for exact values in the middle, because the splitter generates some + // ugly Unicode-isms. But do check that we get multiple splits and that it starts + // and ends on the correct points. + assertEquals("Hand", splits.get(0)); + assertEquals("Hardy", splits.get(splits.size() -1)); + assertEquals(6, splits.size()); + } +} +