hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r789846 [1/2] - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/mapreduce/
Date Tue, 30 Jun 2009 17:42:54 GMT
Author: stack
Date: Tue Jun 30 17:42:53 2009
New Revision: 789846

URL: http://svn.apache.org/viewvc?rev=789846&view=rev
Log:
HBASE-1385 Revamp TableInputFormat, needs updating to match hadoop 0.20.x AND remove bit where we can make < maps than regions

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Scan.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/BuildTableIndex.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMap.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMap.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReduce.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IndexConfiguration.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IndexOutputFormat.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IndexTableReduce.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/LuceneDocumentWrapper.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMap.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableReduce.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Tue Jun 30 17:42:53 2009
@@ -421,6 +421,9 @@
    HBASE-1587  Update ganglia config and doc to account for ganglia 3.1 and
                hadoop-4675
    HBASE-1589  Up zk maxClientCnxns from default of 10 to 20 or 30 or so
+   HBASE-1385  Revamp TableInputFormat, needs updating to match hadoop 0.20.x
+               AND remove bit where we can make < maps than regions
+               (Lars George via Stack)
 
   OPTIMIZATIONS
    HBASE-1412  Change values for delete column and column family in KeyValue

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Scan.java?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Scan.java Tue Jun 30 17:42:53 2009
@@ -109,6 +109,33 @@
   }
   
   /**
+   * Creates a new instance of this class while copying all values.
+   * 
+   * @param scan  The scan instance to copy from.
+   * @throws IOException When copying the values fails.
+   */
+  public Scan(Scan scan) throws IOException {
+    startRow = scan.getStartRow();
+    stopRow  = scan.getStopRow();
+    maxVersions = scan.getMaxVersions();
+    filter = scan.getFilter(); // clone?
+    oldFilter = scan.getOldFilter(); // clone?
+    TimeRange ctr = scan.getTimeRange();
+    tr = new TimeRange(ctr.getMin(), ctr.getMax());
+    Map<byte[], NavigableSet<byte[]>> fams = scan.getFamilyMap();
+    for (byte[] fam : fams.keySet()) {
+      NavigableSet<byte[]> cols = fams.get(fam);
+      if (cols != null && cols.size() > 0) {
+        for (byte[] col : cols) {
+          addColumn(fam, col);
+        }
+      } else {
+        addFamily(fam);
+      }
+    }
+  }
+
+  /**
    * Get all columns from the specified family.
    * <p>
    * Overrides previous calls to addColumn for this family.
@@ -137,26 +164,89 @@
 
     return this;
   }
+
+  /**
+   * Parses a combined family and qualifier and adds either both or just the 
+   * family in case there is not qualifier. This assumes the older colon 
+   * divided notation, e.g. "data:contents" or "meta:".
+   * <p>
+   * Note: It will through an error when the colon is missing.
+   * 
+   * @param familyAndQualifier
+   * @return A reference to this instance.
+   * @throws IllegalArgumentException When the colon is missing.
+   */
+  public Scan addColumn(byte[] familyAndQualifier) {
+    byte[][] fq = KeyValue.parseColumn(familyAndQualifier);
+    if (fq[1].length > 0) {
+      addColumn(fq[0], fq[1]);  
+    } else {
+      addFamily(fq[0]);
+    }
+    return this;
+  }
   
   /**
    * Adds an array of columns specified the old format, family:qualifier.
    * <p>
    * Overrides previous calls to addFamily for any families in the input.
+   * 
    * @param columns array of columns, formatted as <pre>family:qualifier</pre>
    */
   public Scan addColumns(byte [][] columns) {
     for(int i=0; i<columns.length; i++) {
-      try {
-        byte [][] split = KeyValue.parseColumn(columns[i]);
-        if (split[1].length == 0) {
-          addFamily(split[0]);
-        } else {
-          addColumn(split[0], split[1]);
-        }
-      } catch(Exception e) {}
+      addColumn(columns[i]);
     }
     return this;
   }
+
+  /**
+   * Convenience method to help parse old style (or rather user entry on the
+   * command line) column definitions, e.g. "data:contents mime:". The columns
+   * must be space delimited and always have a colon (":") to denote family
+   * and qualifier.
+   * 
+   * @param columns  The columns to parse.
+   * @return A reference to this instance.
+   */
+  public Scan addColumns(String columns) {
+    String[] cols = columns.split(" ");
+    for (String col : cols) {
+      addColumn(Bytes.toBytes(col));
+    }
+    return this;
+  }
+
+  /**
+   * Helps to convert the binary column families and qualifiers to a text 
+   * representation, e.g. "data:mimetype data:contents meta:". Binary values
+   * are properly encoded using {@link Bytes#toBytesBinary(String)}.
+   * 
+   * @return The columns in an old style string format.
+   */
+  public String getInputColumns() {
+    String cols = "";
+    for (Map.Entry<byte[], NavigableSet<byte[]>> e : 
+      familyMap.entrySet()) {
+      byte[] fam = e.getKey();
+      if (cols.length() > 0) cols += " ";
+      NavigableSet<byte[]> quals = e.getValue();
+      // check if this family has qualifiers
+      if (quals != null && quals.size() > 0) {
+        String cs = "";
+        for (byte[] qual : quals) {
+          if (cs.length() > 0) cs += " ";
+          // encode values to make parsing easier later
+          cs += Bytes.toStringBinary(fam) + ":" + Bytes.toStringBinary(qual);
+        }
+        cols += cs;
+      } else {
+        // only add the family but with old style delimiter 
+        cols += Bytes.toStringBinary(fam) + ":";
+      }
+    }
+    return cols;
+  }
   
   /**
    * Get versions of columns only within the specified timestamp range,

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/BuildTableIndex.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/BuildTableIndex.java?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/BuildTableIndex.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/BuildTableIndex.java Tue Jun 30 17:42:53 2009
@@ -27,9 +27,10 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
 
 /**
  * Example table column indexing class.  Runs a mapreduce job to index
@@ -51,32 +52,36 @@
  * </ul>
  */
 public class BuildTableIndex {
+
   private static final String USAGE = "Usage: BuildTableIndex " +
-    "-m <numMapTasks> -r <numReduceTasks>\n  -indexConf <iconfFile> " +
-    "-indexDir <indexDir>\n  -table <tableName> -columns <columnName1> " +
+    "-r <numReduceTasks> -indexConf <iconfFile>\n" +
+    "-indexDir <indexDir> -table <tableName>\n -columns <columnName1> " +
     "[<columnName2> ...]";
 
+  /**
+   * Prints the usage message and exists the program.
+   * 
+   * @param message  The message to print first.
+   */
   private static void printUsage(String message) {
     System.err.println(message);
     System.err.println(USAGE);
     System.exit(-1);
   }
 
-  /** default constructor */
-  public BuildTableIndex() {
-    super();
-  }
-
   /**
-   * @param args
-   * @throws IOException
+   * Creates a new job.
+   * @param conf 
+   * 
+   * @param args  The command line arguments.
+   * @throws IOException When reading the configuration fails.
    */
-  public void run(String[] args) throws IOException {
+  public static Job createSubmittableJob(Configuration conf, String[] args) 
+  throws IOException {
     if (args.length < 6) {
       printUsage("Too few arguments");
     }
 
-    int numMapTasks = 1;
     int numReduceTasks = 1;
     String iconfFile = null;
     String indexDir = null;
@@ -85,9 +90,7 @@
 
     // parse args
     for (int i = 0; i < args.length - 1; i++) {
-      if ("-m".equals(args[i])) {
-        numMapTasks = Integer.parseInt(args[++i]);
-      } else if ("-r".equals(args[i])) {
+      if ("-r".equals(args[i])) {
         numReduceTasks = Integer.parseInt(args[++i]);
       } else if ("-indexConf".equals(args[i])) {
         iconfFile = args[++i];
@@ -111,7 +114,6 @@
         "be specified");
     }
 
-    Configuration conf = new HBaseConfiguration();
     if (iconfFile != null) {
       // set index configuration content from a file
       String content = readContent(iconfFile);
@@ -121,52 +123,31 @@
       conf.set("hbase.index.conf", content);
     }
 
-    if (columnNames != null) {
-      JobConf jobConf = createJob(conf, numMapTasks, numReduceTasks, indexDir,
-          tableName, columnNames.toString());
-      JobClient.runJob(jobConf);
-    }
-  }
-
-  /**
-   * @param conf
-   * @param numMapTasks
-   * @param numReduceTasks
-   * @param indexDir
-   * @param tableName
-   * @param columnNames
-   * @return JobConf
-   */
-  public JobConf createJob(Configuration conf, int numMapTasks,
-      int numReduceTasks, String indexDir, String tableName,
-      String columnNames) {
-    JobConf jobConf = new JobConf(conf, BuildTableIndex.class);
-    jobConf.setJobName("build index for table " + tableName);
-    jobConf.setNumMapTasks(numMapTasks);
+    Job job = new Job(conf, "build index for table " + tableName);
     // number of indexes to partition into
-    jobConf.setNumReduceTasks(numReduceTasks);
-
+    job.setNumReduceTasks(numReduceTasks);
+    Scan scan = new Scan();
+    scan.addColumns(columnNames.toString());
     // use identity map (a waste, but just as an example)
-    IdentityTableMap.initJob(tableName, columnNames, IdentityTableMap.class,
-        jobConf);
-
+    IdentityTableMapper.initJob(tableName, scan, 
+      IdentityTableMapper.class, job);
     // use IndexTableReduce to build a Lucene index
-    jobConf.setReducerClass(IndexTableReduce.class);
-    FileOutputFormat.setOutputPath(jobConf, new Path(indexDir));
-    jobConf.setOutputFormat(IndexOutputFormat.class);
-
-    return jobConf;
+    job.setReducerClass(IndexTableReducer.class);
+    FileOutputFormat.setOutputPath(job, new Path(indexDir));
+    job.setOutputFormatClass(IndexOutputFormat.class);
+    return job;
   }
 
-  /*
-   * Read xml file of indexing configurations.  The xml format is similar to
+  /**
+   * Reads xml file of indexing configurations.  The xml format is similar to
    * hbase-default.xml and hadoop-default.xml. For an example configuration,
-   * see the <code>createIndexConfContent</code> method in TestTableIndex
-   * @param fileName File to read.
-   * @return XML configuration read from file
-   * @throws IOException
+   * see the <code>createIndexConfContent</code> method in TestTableIndex.
+   * 
+   * @param fileName  The file to read.
+   * @return XML configuration read from file.
+   * @throws IOException When the XML is broken.
    */
-  private String readContent(String fileName) throws IOException {
+  private static String readContent(String fileName) throws IOException {
     File file = new File(fileName);
     int length = (int) file.length();
     if (length == 0) {
@@ -195,11 +176,17 @@
   }
 
   /**
-   * @param args
-   * @throws IOException
+   * The main entry point.
+   * 
+   * @param args  The command line arguments.
+   * @throws Exception When running the job fails.
    */
-  public static void main(String[] args) throws IOException {
-    BuildTableIndex build = new BuildTableIndex();
-    build.run(args);
+  public static void main(String[] args) throws Exception {
+    HBaseConfiguration conf = new HBaseConfiguration();
+    String[] otherArgs = 
+      new GenericOptionsParser(conf, args).getRemainingArgs();
+    Job job = createSubmittableJob(conf, otherArgs);
+    System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
+  
 }
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMap.java?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMap.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMap.java Tue Jun 30 17:42:53 2009
@@ -1,158 +0,0 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * Extract grouping columns from input record
- */
-public class GroupingTableMap
-extends MapReduceBase
-implements TableMap<ImmutableBytesWritable,Result> {
-
-  /**
-   * JobConf parameter to specify the columns used to produce the key passed to 
-   * collect from the map phase
-   */
-  public static final String GROUP_COLUMNS =
-    "hbase.mapred.groupingtablemap.columns";
-  
-  protected byte [][] m_columns;
-
-  /**
-   * Use this before submitting a TableMap job. It will appropriately set up the
-   * JobConf.
-   *
-   * @param table table to be processed
-   * @param columns space separated list of columns to fetch
-   * @param groupColumns space separated list of columns used to form the key
-   * used in collect
-   * @param mapper map class
-   * @param job job configuration object
-   */
-  @SuppressWarnings("unchecked")
-  public static void initJob(String table, String columns, String groupColumns, 
-    Class<? extends TableMap> mapper, JobConf job) {
-    
-    TableMapReduceUtil.initTableMapJob(table, columns, mapper,
-        ImmutableBytesWritable.class, Result.class, job);
-    job.set(GROUP_COLUMNS, groupColumns);
-  }
-
-  @Override
-  public void configure(JobConf job) {
-    super.configure(job);
-    String[] cols = job.get(GROUP_COLUMNS, "").split(" ");
-    m_columns = new byte[cols.length][];
-    for(int i = 0; i < cols.length; i++) {
-      m_columns[i] = Bytes.toBytes(cols[i]);
-    }
-  }
-
-  /**
-   * Extract the grouping columns from value to construct a new key.
-   * 
-   * Pass the new key and value to reduce.
-   * If any of the grouping columns are not found in the value, the record is skipped.
-   * @param key 
-   * @param value 
-   * @param output 
-   * @param reporter 
-   * @throws IOException 
-   */
-  public void map(ImmutableBytesWritable key, Result value, 
-      OutputCollector<ImmutableBytesWritable,Result> output,
-      Reporter reporter) throws IOException {
-    
-    byte[][] keyVals = extractKeyValues(value);
-    if(keyVals != null) {
-      ImmutableBytesWritable tKey = createGroupKey(keyVals);
-      output.collect(tKey, value);
-    }
-  }
-
-  /**
-   * Extract columns values from the current record. This method returns
-   * null if any of the columns are not found.
-   * 
-   * Override this method if you want to deal with nulls differently.
-   * 
-   * @param r
-   * @return array of byte values
-   */
-  protected byte[][] extractKeyValues(Result r) {
-    byte[][] keyVals = null;
-    ArrayList<byte[]> foundList = new ArrayList<byte[]>();
-    int numCols = m_columns.length;
-    if (numCols > 0) {
-      for (KeyValue value: r.list()) {
-        byte [] column = value.getColumn();
-        for (int i = 0; i < numCols; i++) {
-          if (Bytes.equals(column, m_columns[i])) {
-            foundList.add(value.getValue());
-            break;
-          }
-        }
-      }
-      if(foundList.size() == numCols) {
-        keyVals = foundList.toArray(new byte[numCols][]);
-      }
-    }
-    return keyVals;
-  }
-
-  /**
-   * Create a key by concatenating multiple column values. 
-   * Override this function in order to produce different types of keys.
-   * 
-   * @param vals
-   * @return key generated by concatenating multiple column values
-   */
-  protected ImmutableBytesWritable createGroupKey(byte[][] vals) {
-    if(vals == null) {
-      return null;
-    }
-    StringBuilder sb =  new StringBuilder();
-    for(int i = 0; i < vals.length; i++) {
-      if(i > 0) {
-        sb.append(" ");
-      }
-      try {
-        sb.append(new String(vals[i], HConstants.UTF8_ENCODING));
-      } catch (UnsupportedEncodingException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    return new ImmutableBytesWritable(Bytes.toBytes(sb.toString()));
-  }
-}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java Tue Jun 30 17:42:53 2009
@@ -23,44 +23,47 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapreduce.Partitioner;
 
 /**
  * This is used to partition the output keys into groups of keys.
  * Keys are grouped according to the regions that currently exist
  * so that each reducer fills a single region so load is distributed.
  * 
- * @param <K2>
- * @param <V2>
+ * @param <KEY>  The type of the key.
+ * @param <VALUE>  The type of the value.
  */
-public class HRegionPartitioner<K2,V2> 
-implements Partitioner<ImmutableBytesWritable, V2> {
+public class HRegionPartitioner<KEY, VALUE> 
+extends Partitioner<ImmutableBytesWritable, VALUE>
+implements Configurable {
+  
   private final Log LOG = LogFactory.getLog(TableInputFormat.class);
+  private Configuration conf = null;
   private HTable table;
   private byte[][] startKeys; 
   
-  public void configure(JobConf job) {
-    try {
-      this.table = new HTable(new HBaseConfiguration(job), 
-        job.get(TableOutputFormat.OUTPUT_TABLE));
-    } catch (IOException e) {
-      LOG.error(e);
-    }
-    
-    try {
-      this.startKeys = this.table.getStartKeys();
-    } catch (IOException e) {
-      LOG.error(e);
-    }
-  }
-
+  /**
+   * Gets the partition number for a given key (hence record) given the total 
+   * number of partitions i.e. number of reduce-tasks for the job.
+   *   
+   * <p>Typically a hash function on a all or a subset of the key.</p>
+   *
+   * @param key  The key to be partitioned.
+   * @param value  The entry value.
+   * @param numPartitions  The total number of partitions.
+   * @return The partition number for the <code>key</code>.
+   * @see org.apache.hadoop.mapreduce.Partitioner#getPartition(
+   *   java.lang.Object, java.lang.Object, int)
+   */
+  @Override
   public int getPartition(ImmutableBytesWritable key,
-      V2 value, int numPartitions) {
+      VALUE value, int numPartitions) {
     byte[] region = null;
     // Only one region return 0
     if (this.startKeys.length == 1){
@@ -86,4 +89,39 @@
     // if above fails to find start key that match we need to return something
     return 0;
   }
+
+  /**
+   * Returns the current configuration.
+   *  
+   * @return The current configuration.
+   * @see org.apache.hadoop.conf.Configurable#getConf()
+   */
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Sets the configuration. This is used to determine the start keys for the
+   * given table.
+   * 
+   * @param configuration  The configuration to set.
+   * @see org.apache.hadoop.conf.Configurable#setConf(
+   *   org.apache.hadoop.conf.Configuration)
+   */
+  @Override
+  public void setConf(Configuration configuration) {
+    this.conf = configuration;
+    try {
+      this.table = new HTable(new HBaseConfiguration(conf), 
+        configuration.get(TableOutputFormat.OUTPUT_TABLE));
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+    try {
+      this.startKeys = this.table.getStartKeys();
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+  }
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMap.java?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMap.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMap.java Tue Jun 30 17:42:53 2009
@@ -1,74 +0,0 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * Pass the given key and record as-is to reduce
- */
-public class IdentityTableMap
-extends MapReduceBase
-implements TableMap<ImmutableBytesWritable, Result> {
-
-  /** constructor */
-  public IdentityTableMap() {
-    super();
-  }
-
-  /**
-   * Use this before submitting a TableMap job. It will
-   * appropriately set up the JobConf.
-   * 
-   * @param table table name
-   * @param columns columns to scan
-   * @param mapper mapper class
-   * @param job job configuration
-   */
-  @SuppressWarnings("unchecked")
-  public static void initJob(String table, String columns,
-    Class<? extends TableMap> mapper, JobConf job) {
-    TableMapReduceUtil.initTableMapJob(table, columns, mapper,
-      ImmutableBytesWritable.class, Result.class, job);
-  }
-
-  /**
-   * Pass the key, value to reduce
-   * @param key 
-   * @param value 
-   * @param output 
-   * @param reporter 
-   * @throws IOException 
-   */
-  public void map(ImmutableBytesWritable key, Result value,
-      OutputCollector<ImmutableBytesWritable,Result> output,
-      Reporter reporter) throws IOException {
-    
-    // convert 
-    output.collect(key, value);
-  }
-}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReduce.java?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReduce.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReduce.java Tue Jun 30 17:42:53 2009
@@ -1,59 +0,0 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * Write to table each key, record pair
- */
-public class IdentityTableReduce
-extends MapReduceBase
-implements TableReduce<ImmutableBytesWritable, Put> {
-  @SuppressWarnings("unused")
-  private static final Log LOG =
-    LogFactory.getLog(IdentityTableReduce.class.getName());
-  
-  /**
-   * No aggregation, output pairs of (key, record)
-   * @param key 
-   * @param values 
-   * @param output 
-   * @param reporter 
-   * @throws IOException 
-   */
-  public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
-    OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter)
-      throws IOException {
-    
-    while(values.hasNext()) {
-      output.collect(key, values.next());
-    }
-  }
-}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IndexConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IndexConfiguration.java?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IndexConfiguration.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IndexConfiguration.java Tue Jun 30 17:42:53 2009
@@ -44,9 +44,10 @@
 import org.w3c.dom.Text;
 
 /**
- * Configuration parameters for building a Lucene index
+ * Configuration parameters for building a Lucene index.
  */
 public class IndexConfiguration extends Configuration {
+  
   private static final Log LOG = LogFactory.getLog(IndexConfiguration.class);
 
   static final String HBASE_COLUMN_NAME = "hbase.column.name";

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IndexOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IndexOutputFormat.java?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IndexOutputFormat.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IndexOutputFormat.java Tue Jun 30 17:42:53 2009
@@ -27,13 +27,10 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.document.Document;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.search.Similarity;
 
@@ -41,29 +38,40 @@
  * Create a local index, unwrap Lucene documents created by reduce, add them to
  * the index, and copy the index to the destination.
  */
-public class IndexOutputFormat extends
-    FileOutputFormat<ImmutableBytesWritable, LuceneDocumentWrapper> {
+public class IndexOutputFormat 
+extends FileOutputFormat<ImmutableBytesWritable, LuceneDocumentWrapper> {
+  
   static final Log LOG = LogFactory.getLog(IndexOutputFormat.class);
 
+  /** Random generator. */
   private Random random = new Random();
 
+  /**
+   * Returns the record writer.
+   * 
+   * @param context  The current task context.
+   * @return The record writer.
+   * @throws IOException When there is an issue with the writer.
+   * @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
+   */
   @Override
   public RecordWriter<ImmutableBytesWritable, LuceneDocumentWrapper>
-  getRecordWriter(final FileSystem fs, JobConf job, String name,
-      final Progressable progress)
+    getRecordWriter(TaskAttemptContext context)
   throws IOException {
 
-    final Path perm = new Path(FileOutputFormat.getOutputPath(job), name);
-    final Path temp = job.getLocalPath("index/_"
-        + Integer.toString(random.nextInt()));
+    final Path perm = new Path(FileOutputFormat.getOutputPath(context), 
+      FileOutputFormat.getUniqueFile(context, "part", ""));
+    // null for "dirsProp" means no predefined directories
+    final Path temp = context.getConfiguration().getLocalPath(
+      "mapred.local.dir", "index/_" + Integer.toString(random.nextInt()));
 
     LOG.info("To index into " + perm);
-
+    FileSystem fs = FileSystem.get(context.getConfiguration());
     // delete old, if any
     fs.delete(perm, true);
 
     final IndexConfiguration indexConf = new IndexConfiguration();
-    String content = job.get("hbase.index.conf");
+    String content = context.getConfiguration().get("hbase.index.conf");
     if (content != null) {
       indexConf.addFromXML(content);
     }
@@ -99,65 +107,7 @@
       }
     }
     writer.setUseCompoundFile(indexConf.isUseCompoundFile());
-
-    return new RecordWriter<ImmutableBytesWritable, LuceneDocumentWrapper>() {
-      boolean closed;
-      private long docCount = 0;
-
-      public void write(ImmutableBytesWritable key, 
-          LuceneDocumentWrapper value)
-      throws IOException {
-        // unwrap and index doc
-        Document doc = value.get();
-        writer.addDocument(doc);
-        docCount++;
-        progress.progress();
-      }
-
-      public void close(final Reporter reporter) throws IOException {
-        // spawn a thread to give progress heartbeats
-        Thread prog = new Thread() {
-          @Override
-          public void run() {
-            while (!closed) {
-              try {
-                reporter.setStatus("closing");
-                Thread.sleep(1000);
-              } catch (InterruptedException e) {
-                continue;
-              } catch (Throwable e) {
-                return;
-              }
-            }
-          }
-        };
-
-        try {
-          prog.start();
-
-          // optimize index
-          if (indexConf.doOptimize()) {
-            if (LOG.isInfoEnabled()) {
-              LOG.info("Optimizing index.");
-            }
-            writer.optimize();
-          }
-
-          // close index
-          writer.close();
-          if (LOG.isInfoEnabled()) {
-            LOG.info("Done indexing " + docCount + " docs.");
-          }
-
-          // copy to perm destination in dfs
-          fs.completeLocalOutput(perm, temp);
-          if (LOG.isInfoEnabled()) {
-            LOG.info("Copy done.");
-          }
-        } finally {
-          closed = true;
-        }
-      }
-    };
+    return new IndexRecordWriter(context, fs, writer, indexConf, perm, temp);
   }
+  
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IndexTableReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IndexTableReduce.java?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IndexTableReduce.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IndexTableReduce.java Tue Jun 30 17:42:53 2009
@@ -1,109 +0,0 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * Construct a Lucene document per row, which is consumed by IndexOutputFormat
- * to build a Lucene index
- */
-public class IndexTableReduce extends MapReduceBase implements
-    Reducer<ImmutableBytesWritable, Result, ImmutableBytesWritable, LuceneDocumentWrapper> {
-  private static final Log LOG = LogFactory.getLog(IndexTableReduce.class);
-  private IndexConfiguration indexConf;
-
-  @Override
-  public void configure(JobConf job) {
-    super.configure(job);
-    indexConf = new IndexConfiguration();
-    String content = job.get("hbase.index.conf");
-    if (content != null) {
-      indexConf.addFromXML(content);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Index conf: " + indexConf);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    super.close();
-  }
-
-  public void reduce(ImmutableBytesWritable key, Iterator<Result> values,
-      OutputCollector<ImmutableBytesWritable, LuceneDocumentWrapper> output,
-      Reporter reporter)
-  throws IOException {
-    if (!values.hasNext()) {
-      return;
-    }
-
-    Document doc = new Document();
-
-    // index and store row key, row key already UTF-8 encoded
-    Field keyField = new Field(indexConf.getRowkeyName(),
-      Bytes.toString(key.get(), key.getOffset(), key.getLength()),
-      Field.Store.YES, Field.Index.UN_TOKENIZED);
-    keyField.setOmitNorms(true);
-    doc.add(keyField);
-
-    while (values.hasNext()) {
-      Result value = values.next();
-
-      // each column (name-value pair) is a field (name-value pair)
-      for (KeyValue kv: value.list()) {
-        // name is already UTF-8 encoded
-        String column = Bytes.toString(kv.getColumn());
-        byte[] columnValue = kv.getValue();
-        Field.Store store = indexConf.isStore(column)?
-          Field.Store.YES: Field.Store.NO;
-        Field.Index index = indexConf.isIndex(column)?
-          (indexConf.isTokenize(column)?
-            Field.Index.TOKENIZED: Field.Index.UN_TOKENIZED):
-            Field.Index.NO;
-
-        // UTF-8 encode value
-        Field field = new Field(column, Bytes.toString(columnValue), 
-          store, index);
-        field.setBoost(indexConf.getBoost(column));
-        field.setOmitNorms(indexConf.isOmitNorms(column));
-
-        doc.add(field);
-      }
-    }
-    output.collect(key, new LuceneDocumentWrapper(doc));
-  }
-}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/LuceneDocumentWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/LuceneDocumentWrapper.java?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/LuceneDocumentWrapper.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/LuceneDocumentWrapper.java Tue Jun 30 17:42:53 2009
@@ -29,6 +29,8 @@
  * It doesn't really serialize/deserialize a lucene document.
  */
 public class LuceneDocumentWrapper implements Writable {
+  
+  /** The document to add to the index. */
   protected Document doc;
 
   /**

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java Tue Jun 30 17:42:53 2009
@@ -21,73 +21,71 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
 
 /**
- * A job with a map to count rows.
- * Map outputs table rows IF the input row has columns that have content.  
- * Uses an {@link IdentityReducer}
+ * A job with a just a map phase to count rows. Map outputs table rows IF the 
+ * input row has columns that have content.
  */
-public class RowCounter extends Configured implements Tool {
-  // Name of this 'program'
+public class RowCounter {
+  
+  /** Name of this 'program'. */
   static final String NAME = "rowcounter";
 
   /**
    * Mapper that runs the count.
    */
   static class RowCounterMapper
-  implements TableMap<ImmutableBytesWritable, Result> {
-    private static enum Counters {ROWS}
-
+  extends TableMapper<ImmutableBytesWritable, Result> {
+    
+    /** Counter enumeration to count the actual rows. */
+    private static enum Counters { ROWS }
+
+    /**
+     * Maps the data.
+     * 
+     * @param row  The current table row key.
+     * @param values  The columns.
+     * @param context  The current context.
+     * @throws IOException When something is broken with the data.
+     * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, 
+     *   org.apache.hadoop.mapreduce.Mapper.Context)
+     */
+    @Override
     public void map(ImmutableBytesWritable row, Result values,
-        OutputCollector<ImmutableBytesWritable, Result> output,
-        Reporter reporter)
+      Context context)
     throws IOException {
-      boolean content = !values.list().isEmpty();
       for (KeyValue value: values.list()) {
         if (value.getValue().length > 0) {
-          content = true;
+          context.getCounter(Counters.ROWS).increment(1);
           break;
         }
       }
-      if (!content) {
-        // Don't count rows that are all empty values.
-        return;
-      }
-      // Give out same value every time.  We're only interested in the row/key
-      reporter.incrCounter(Counters.ROWS, 1);
-    }
-
-    public void configure(JobConf jc) {
-      // Nothing to do.
     }
 
-    public void close() throws IOException {
-      // Nothing to do.
-    }
   }
 
   /**
-   * @param args
-   * @return the JobConf
-   * @throws IOException
+   * Sets up the actual job.
+   * 
+   * @param conf  The current configuration.
+   * @param args  The command line parameters.
+   * @return The newly created job.
+   * @throws IOException When setting up the job fails.
    */
-  public JobConf createSubmittableJob(String[] args) throws IOException {
-    JobConf c = new JobConf(getConf(), getClass());
-    c.setJobName(NAME);
+  public static Job createSubmittableJob(Configuration conf, String[] args) 
+  throws IOException {
+    Job job = new Job(conf, NAME);
+    job.setJarByClass(RowCounter.class);
     // Columns are space delimited
     StringBuilder sb = new StringBuilder();
     final int columnoffset = 2;
@@ -97,38 +95,34 @@
       }
       sb.append(args[i]);
     }
+    Scan scan = new Scan();
+    scan.addColumns(sb.toString());
     // Second argument is the table name.
-    TableMapReduceUtil.initTableMapJob(args[1], sb.toString(),
-      RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, c);
-    c.setNumReduceTasks(0);
-    // First arg is the output directory.
-    FileOutputFormat.setOutputPath(c, new Path(args[0]));
-    return c;
-  }
-  
-  static int printUsage() {
-    System.out.println(NAME +
-      " <outputdir> <tablename> <column1> [<column2>...]");
-    return -1;
-  }
-  
-  public int run(final String[] args) throws Exception {
-    // Make sure there are at least 3 parameters
-    if (args.length < 3) {
-      System.err.println("ERROR: Wrong number of parameters: " + args.length);
-      return printUsage();
-    }
-    JobClient.runJob(createSubmittableJob(args));
-    return 0;
+    TableMapReduceUtil.initTableMapperJob(args[1], scan,
+      RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
+    job.setNumReduceTasks(0);
+    // first argument is the output directory.
+    FileOutputFormat.setOutputPath(job, new Path(args[0]));
+    return job;
   }
 
   /**
-   * @param args
-   * @throws Exception
+   * Main entry point.
+   * 
+   * @param args  The command line parameters.
+   * @throws Exception When running the job fails.
    */
   public static void main(String[] args) throws Exception {
-    HBaseConfiguration c = new HBaseConfiguration();
-    int errCode = ToolRunner.run(c, new RowCounter(), args);
-    System.exit(errCode);
+    HBaseConfiguration conf = new HBaseConfiguration();
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    if (otherArgs.length < 3) {
+      System.err.println("ERROR: Wrong number of parameters: " + args.length);
+      System.err.println("Usage: " + NAME + 
+        " <outputdir> <tablename> <column1> [<column2>...]");
+      System.exit(-1);
+    }
+    Job job = createSubmittableJob(conf, otherArgs);
+    System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
+  
 }
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java Tue Jun 30 17:42:53 2009
@@ -23,60 +23,64 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.util.StringUtils;
 
 /**
  * Convert HBase tabular data into a format that is consumable by Map/Reduce.
  */
-public class TableInputFormat extends TableInputFormatBase implements
-    JobConfigurable {
+public class TableInputFormat extends TableInputFormatBase 
+implements Configurable {
+  
   private final Log LOG = LogFactory.getLog(TableInputFormat.class);
+  
+  /** Job parameter that specifies the output table. */
+  public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
+  /** Space delimited list of columns. */
+  public static final String SCAN = "hbase.mapreduce.scan";
+  
+  /** The configuration. */
+  private Configuration conf = null;
 
   /**
-   * space delimited list of columns
+   * Returns the current configuration.
+   *  
+   * @return The current configuration.
+   * @see org.apache.hadoop.conf.Configurable#getConf()
    */
-  public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
 
-  public void configure(JobConf job) {
-    Path[] tableNames = FileInputFormat.getInputPaths(job);
-    String colArg = job.get(COLUMN_LIST);
-    String[] colNames = colArg.split(" ");
-    byte [][] m_cols = new byte[colNames.length][];
-    for (int i = 0; i < m_cols.length; i++) {
-      m_cols[i] = Bytes.toBytes(colNames[i]);
-    }
-    setInputColumns(m_cols);
+  /**
+   * Sets the configuration. This is used to set the details for the table to
+   * be scanned.
+   * 
+   * @param configuration  The configuration to set.
+   * @see org.apache.hadoop.conf.Configurable#setConf(
+   *   org.apache.hadoop.conf.Configuration)
+   */
+  @Override
+  public void setConf(Configuration configuration) {
+    this.conf = configuration;
+    String tableName = conf.get(INPUT_TABLE);
     try {
-      setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
+      setHTable(new HTable(new HBaseConfiguration(conf), tableName));
     } catch (Exception e) {
       LOG.error(StringUtils.stringifyException(e));
     }
-  }
-
-  public void validateInput(JobConf job) throws IOException {
-    // expecting exactly one path
-    Path [] tableNames = FileInputFormat.getInputPaths(job);
-    if (tableNames == null || tableNames.length > 1) {
-      throw new IOException("expecting one table name");
-    }
-
-    // connected to table?
-    if (getHTable() == null) {
-      throw new IOException("could not connect to table '" +
-        tableNames[0].getName() + "'");
-    }
-
-    // expecting at least one column
-    String colArg = job.get(COLUMN_LIST);
-    if (colArg == null || colArg.length() == 0) {
-      throw new IOException("expecting at least one column");
+    Scan scan = null;
+    try {
+      scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
+    } catch (IOException e) {
+      LOG.error("An error occurred.", e);
     }
+    setScan(scan);
   }
+  
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java Tue Jun 30 17:42:53 2009
@@ -20,8 +20,8 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,23 +31,19 @@
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.hbase.filter.StopRowFilter;
-import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.StringUtils;
 
 /**
- * A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
- * byte[] of input columns and optionally a {@link RowFilterInterface}.
- * Subclasses may use other TableRecordReader implementations.
+ * A base for {@link TableInputFormat}s. Receives a {@link HTable}, an 
+ * {@link Scan} instance that defines the input columns etc. Subclasses may use 
+ * other TableRecordReader implementations.
  * <p>
  * An example of a subclass:
  * <pre>
@@ -72,155 +68,146 @@
  *  }
  * </pre>
  */
-
 public abstract class TableInputFormatBase
-implements InputFormat<ImmutableBytesWritable, Result> {
+extends InputFormat<ImmutableBytesWritable, Result> {
+  
   final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
-  private byte [][] inputColumns;
-  private HTable table;
-  private TableRecordReader tableRecordReader;
-  private RowFilterInterface rowFilter;
+
+  /** Holds the details for the internal scanner. */
+  private Scan scan = null;
+  /** The table to scan. */
+  private HTable table = null;
+  /** The reader scanning the table, can be a custom one. */
+  private TableRecordReader tableRecordReader = null;
 
   /**
-   * Iterate over an HBase table data, return (Text, RowResult) pairs
+   * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) 
+   * pairs.
    */
   protected class TableRecordReader
-  implements RecordReader<ImmutableBytesWritable, Result> {
-    private byte [] startRow;
-    private byte [] endRow;
-    private byte [] lastRow;
-    private RowFilterInterface trrRowFilter;
-    private ResultScanner scanner;
-    private HTable htable;
-    private byte [][] trrInputColumns;
+  extends RecordReader<ImmutableBytesWritable, Result> {
+    
+    private ResultScanner scanner = null;
+    private Scan scan = null;
+    private HTable htable = null;
+    private byte[] lastRow = null;
+    private ImmutableBytesWritable key = null;
+    private Result value = null;
 
     /**
      * Restart from survivable exceptions by creating a new scanner.
      *
-     * @param firstRow
-     * @throws IOException
+     * @param firstRow  The first row to start at.
+     * @throws IOException When restarting fails.
      */
     public void restart(byte[] firstRow) throws IOException {
-      if ((endRow != null) && (endRow.length > 0)) {
-        if (trrRowFilter != null) {
-          final Set<RowFilterInterface> rowFiltersSet =
-            new HashSet<RowFilterInterface>();
-          rowFiltersSet.add(new WhileMatchRowFilter(new StopRowFilter(endRow)));
-          rowFiltersSet.add(trrRowFilter);
-          Scan scan = new Scan(startRow);
-          scan.addColumns(trrInputColumns);
-//          scan.setFilter(new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL,
-//              rowFiltersSet));
-          this.scanner = this.htable.getScanner(scan);
-        } else {
-          Scan scan = new Scan(firstRow, endRow);
-          scan.addColumns(trrInputColumns);
-          this.scanner = this.htable.getScanner(scan);
-        }
-      } else {
-        Scan scan = new Scan(firstRow);
-        scan.addColumns(trrInputColumns);
-//        scan.setFilter(trrRowFilter);
-        this.scanner = this.htable.getScanner(scan);
-      }
+      Scan newScan = new Scan(scan);
+      newScan.setStartRow(firstRow);
+      this.scanner = this.htable.getScanner(newScan);      
     }
 
     /**
      * Build the scanner. Not done in constructor to allow for extension.
      *
-     * @throws IOException
+     * @throws IOException When restarting the scan fails. 
      */
     public void init() throws IOException {
-      restart(startRow);
+      restart(scan.getStartRow());
     }
 
     /**
-     * @param htable the {@link HTable} to scan.
+     * Sets the HBase table.
+     * 
+     * @param htable  The {@link HTable} to scan.
      */
     public void setHTable(HTable htable) {
       this.htable = htable;
     }
 
     /**
-     * @param inputColumns the columns to be placed in {@link Result}.
-     */
-    public void setInputColumns(final byte [][] inputColumns) {
-      this.trrInputColumns = inputColumns;
-    }
-
-    /**
-     * @param startRow the first row in the split
-     */
-    public void setStartRow(final byte [] startRow) {
-      this.startRow = startRow;
-    }
-
-    /**
-     *
-     * @param endRow the last row in the split
+     * Sets the scan defining the actual details like columns etc.
+     *  
+     * @param scan  The scan to set.
      */
-    public void setEndRow(final byte [] endRow) {
-      this.endRow = endRow;
+    public void setScan(Scan scan) {
+      this.scan = scan;
     }
 
     /**
-     * @param rowFilter the {@link RowFilterInterface} to be used.
+     * Closes the split.
+     * 
+     * @see org.apache.hadoop.mapreduce.RecordReader#close()
      */
-    public void setRowFilter(RowFilterInterface rowFilter) {
-      this.trrRowFilter = rowFilter;
-    }
-
+    @Override
     public void close() {
       this.scanner.close();
     }
 
     /**
-     * @return ImmutableBytesWritable
-     *
-     * @see org.apache.hadoop.mapred.RecordReader#createKey()
+     * Returns the current key.
+     *  
+     * @return The current key.
+     * @throws IOException
+     * @throws InterruptedException When the job is aborted.
+     * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey()
      */
-    public ImmutableBytesWritable createKey() {
-      return new ImmutableBytesWritable();
+    @Override
+    public ImmutableBytesWritable getCurrentKey() throws IOException,
+        InterruptedException {
+      return key;
     }
 
     /**
-     * @return RowResult
-     *
-     * @see org.apache.hadoop.mapred.RecordReader#createValue()
+     * Returns the current value.
+     * 
+     * @return The current value.
+     * @throws IOException When the value is faulty.
+     * @throws InterruptedException When the job is aborted.
+     * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue()
      */
-    public Result createValue() {
-      return new Result();
+    @Override
+    public Result getCurrentValue() throws IOException, InterruptedException {
+      return value;
     }
 
-    public long getPos() {
-      // This should be the ordinal tuple in the range;
-      // not clear how to calculate...
-      return 0;
-    }
-
-    public float getProgress() {
-      // Depends on the total number of tuples and getPos
-      return 0;
+    /**
+     * Initializes the reader.
+     * 
+     * @param inputsplit  The split to work with.
+     * @param context  The current task context.
+     * @throws IOException When setting up the reader fails.
+     * @throws InterruptedException When the job is aborted.
+     * @see org.apache.hadoop.mapreduce.RecordReader#initialize(
+     *   org.apache.hadoop.mapreduce.InputSplit, 
+     *   org.apache.hadoop.mapreduce.TaskAttemptContext)
+     */
+    @Override
+    public void initialize(InputSplit inputsplit,
+        TaskAttemptContext context) throws IOException,
+        InterruptedException {
     }
 
     /**
-     * @param key HStoreKey as input key.
-     * @param value MapWritable as input value
-     * @return true if there was more data
-     * @throws IOException
+     * Positions the record reader to the next record.
+     *  
+     * @return <code>true</code> if there was another record.
+     * @throws IOException When reading the record failed.
+     * @throws InterruptedException When the job was aborted.
+     * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue()
      */
-    public boolean next(ImmutableBytesWritable key, Result value)
-    throws IOException {
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (key == null) key = new ImmutableBytesWritable();
+      if (value == null) value = new Result();
       Result result;
       try {
         result = this.scanner.next();
       } catch (UnknownScannerException e) {
         LOG.debug("recovered from " + StringUtils.stringifyException(e));  
         restart(lastRow);
-        this.scanner.next();    // skip presumed already mapped row
-        result = this.scanner.next();
+        scanner.next();    // skip presumed already mapped row
+        result = scanner.next();
       }
-
       if (result != null && result.size() > 0) {
         key.set(result.getRow());
         lastRow = key.get();
@@ -229,17 +216,35 @@
       }
       return false;
     }
+
+    /**
+     * The current progress of the record reader through its data.
+     * 
+     * @return A number between 0.0 and 1.0, the fraction of the data read.
+     * @see org.apache.hadoop.mapreduce.RecordReader#getProgress()
+     */
+    @Override
+    public float getProgress() {
+      // Depends on the total number of tuples
+      return 0;
+    }
   }
 
   /**
    * Builds a TableRecordReader. If no TableRecordReader was provided, uses
    * the default.
-   *
-   * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
-   *      JobConf, Reporter)
+   * 
+   * @param split  The split to work with.
+   * @param context  The current context.
+   * @return The newly created record reader.
+   * @throws IOException When creating the reader fails.
+   * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
+   *   org.apache.hadoop.mapreduce.InputSplit, 
+   *   org.apache.hadoop.mapreduce.TaskAttemptContext)
    */
-  public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
-      InputSplit split, JobConf job, Reporter reporter)
+  @Override
+  public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
+      InputSplit split, TaskAttemptContext context)
   throws IOException {
     TableSplit tSplit = (TableSplit) split;
     TableRecordReader trr = this.tableRecordReader;
@@ -247,45 +252,38 @@
     if (trr == null) {
       trr = new TableRecordReader();
     }
-    trr.setStartRow(tSplit.getStartRow());
-    trr.setEndRow(tSplit.getEndRow());
-    trr.setHTable(this.table);
-    trr.setInputColumns(this.inputColumns);
-    trr.setRowFilter(this.rowFilter);
+    Scan sc = new Scan(scan);
+    sc.setStartRow(tSplit.getStartRow());
+    sc.setStopRow(tSplit.getEndRow());
+    trr.setScan(sc);
+    trr.setHTable(table);
     trr.init();
     return trr;
   }
 
   /**
-   * Calculates the splits that will serve as input for the map tasks.
-   * <ul>
-   * Splits are created in number equal to the smallest between numSplits and
-   * the number of {@link HRegion}s in the table. If the number of splits is
-   * smaller than the number of {@link HRegion}s then splits are spanned across
-   * multiple {@link HRegion}s and are grouped the most evenly possible. In the
-   * case splits are uneven the bigger splits are placed first in the
-   * {@link InputSplit} array.
-   *
-   * @param job the map task {@link JobConf}
-   * @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
-   *
-   * @return the input splits
+   * Calculates the splits that will serve as input for the map tasks. The
+   * number of splits matches the number of regions in a table.
    *
-   * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
+   * @param context  The current job context.
+   * @return The list of input splits.
+   * @throws IOException When creating the list of splits fails.
+   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
+   *   org.apache.hadoop.mapreduce.JobContext)
    */
-  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    byte [][] startKeys = this.table.getStartKeys();
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    byte [][] startKeys = table.getStartKeys();
     if (startKeys == null || startKeys.length == 0) {
-      throw new IOException("Expecting at least one region");
+      throw new IOException("Expecting at least one region.");
     }
-    if (this.table == null) {
-      throw new IOException("No table was provided");
+    if (table == null) {
+      throw new IOException("No table was provided.");
     }
-    if (this.inputColumns == null || this.inputColumns.length == 0) {
-      throw new IOException("Expecting at least one column");
+    if (!scan.hasFamilies()) {
+      throw new IOException("Expecting at least one column.");
     }
-    int realNumSplits = numSplits > startKeys.length? startKeys.length:
-      numSplits;
+    int realNumSplits = startKeys.length;
     InputSplit[] splits = new InputSplit[realNumSplits];
     int middle = startKeys.length / realNumSplits;
     int startPos = 0;
@@ -300,14 +298,7 @@
       LOG.info("split: " + i + "->" + splits[i]);
       startPos = lastPos;
     }
-    return splits;
-  }
-
-  /**
-   * @param inputColumns to be passed in {@link Result} to the map task.
-   */
-  protected void setInputColumns(byte [][] inputColumns) {
-    this.inputColumns = inputColumns;
+    return Arrays.asList(splits);
   }
 
   /**
@@ -320,28 +311,39 @@
   /**
    * Allows subclasses to set the {@link HTable}.
    *
-   * @param table to get the data from
+   * @param table  The table to get the data from.
    */
   protected void setHTable(HTable table) {
     this.table = table;
   }
 
   /**
-   * Allows subclasses to set the {@link TableRecordReader}.
-   *
-   * @param tableRecordReader
-   *                to provide other {@link TableRecordReader} implementations.
+   * Gets the scan defining the actual details like columns etc.
+   *  
+   * @return The internal scan instance.
    */
-  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
-    this.tableRecordReader = tableRecordReader;
+  public Scan getScan() {
+    if (scan == null) scan = new Scan();
+    return scan;
   }
 
   /**
-   * Allows subclasses to set the {@link RowFilterInterface} to be used.
+   * Sets the scan defining the actual details like columns etc.
+   *  
+   * @param scan  The scan to set.
+   */
+  public void setScan(Scan scan) {
+    this.scan = scan;
+  }
+
+  /**
+   * Allows subclasses to set the {@link TableRecordReader}.
    *
-   * @param rowFilter
+   * @param tableRecordReader A different {@link TableRecordReader} 
+   *   implementation.
    */
-  protected void setRowFilter(RowFilterInterface rowFilter) {
-    this.rowFilter = rowFilter;
+  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+    this.tableRecordReader = tableRecordReader;
   }
+
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMap.java?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMap.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMap.java Tue Jun 30 17:42:53 2009
@@ -1,38 +0,0 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.Mapper;
-
-/**
- * Scan an HBase table to sort by a specified sort column.
- * If the column does not exist, the record is not passed to Reduce.
- *
- * @param <K> WritableComparable key class
- * @param <V> Writable value class
- */
-public interface TableMap<K extends WritableComparable<K>, V extends Writable>
-extends Mapper<ImmutableBytesWritable, Result, K, V> {
-
-}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Tue Jun 30 17:42:53 2009
@@ -19,45 +19,80 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
 
 /**
- * Utility for {@link TableMap} and {@link TableReduce}
+ * Utility for {@link TableMapper} and {@link TableReducer}
  */
 @SuppressWarnings("unchecked")
 public class TableMapReduceUtil {
   
   /**
-   * Use this before submitting a TableMap job. It will
-   * appropriately set up the JobConf.
+   * Use this before submitting a TableMap job. It will appropriately set up 
+   * the job.
    * 
    * @param table  The table name to read from.
-   * @param columns  The columns to scan.
+   * @param scan  The scan instance with the columns, time range etc.
    * @param mapper  The mapper class to use.
    * @param outputKeyClass  The class of the output key.
    * @param outputValueClass  The class of the output value.
-   * @param job  The current job configuration to adjust.
+   * @param job  The current job to adjust.
+   * @throws IOException When setting up the details fails.
    */
-  public static void initTableMapJob(String table, String columns,
-    Class<? extends TableMap> mapper, 
-    Class<? extends WritableComparable> outputKeyClass, 
-    Class<? extends Writable> outputValueClass, JobConf job) {
-      
-    job.setInputFormat(TableInputFormat.class);
+  public static void initTableMapperJob(String table, Scan scan,
+      Class<? extends TableMapper> mapper, 
+      Class<? extends WritableComparable> outputKeyClass, 
+      Class<? extends Writable> outputValueClass, Job job) throws IOException {
+    job.setInputFormatClass(TableInputFormat.class);
     job.setMapOutputValueClass(outputValueClass);
     job.setMapOutputKeyClass(outputKeyClass);
     job.setMapperClass(mapper);
-    FileInputFormat.addInputPaths(job, table);
-    job.set(TableInputFormat.COLUMN_LIST, columns);
+    job.getConfiguration().set(TableInputFormat.INPUT_TABLE, table);
+    job.getConfiguration().set(TableInputFormat.SCAN, 
+      convertScanToString(scan));
+  }
+
+  /**
+   * Writes the given scan into a Base64 encoded string.
+   * 
+   * @param scan  The scan to write out.
+   * @return The scan saved in a Base64 encoded string.
+   * @throws IOException When writing the scan fails.
+   */
+  static String convertScanToString(Scan scan) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();  
+    DataOutputStream dos = new DataOutputStream(out);
+    scan.write(dos);
+    return Base64.encodeBytes(out.toByteArray());
+  }
+
+  /**
+   * Converts the given Base64 string back into a Scan instance.
+   * 
+   * @param base64  The scan details.
+   * @return The newly created Scan instance.
+   * @throws IOException When reading the scan instance fails.
+   */
+  static Scan convertStringToScan(String base64) throws IOException {
+    ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64));
+    DataInputStream dis = new DataInputStream(bis);
+    Scan scan = new Scan();
+    scan.readFields(dis);
+    return scan;
   }
   
   /**
@@ -66,13 +101,13 @@
    * 
    * @param table  The output table.
    * @param reducer  The reducer class to use.
-   * @param job  The current job configuration to adjust.
+   * @param job  The current job to adjust.
    * @throws IOException When determining the region count fails. 
    */
-  public static void initTableReduceJob(String table,
-    Class<? extends TableReduce> reducer, JobConf job)
+  public static void initTableReducerJob(String table,
+    Class<? extends TableReducer> reducer, Job job)
   throws IOException {
-    initTableReduceJob(table, reducer, job, null);
+    initTableReducerJob(table, reducer, job, null);
   }
 
   /**
@@ -81,22 +116,23 @@
    * 
    * @param table  The output table.
    * @param reducer  The reducer class to use.
-   * @param job  The current job configuration to adjust.
+   * @param job  The current job to adjust.
    * @param partitioner  Partitioner to use. Pass <code>null</code> to use 
    * default partitioner.
    * @throws IOException When determining the region count fails. 
    */
-  public static void initTableReduceJob(String table,
-    Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
+  public static void initTableReducerJob(String table,
+    Class<? extends TableReducer> reducer, Job job, Class partitioner)
   throws IOException {
-    job.setOutputFormat(TableOutputFormat.class);
+    job.setOutputFormatClass(TableOutputFormat.class);
     job.setReducerClass(reducer);
-    job.set(TableOutputFormat.OUTPUT_TABLE, table);
+    job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
     job.setOutputKeyClass(ImmutableBytesWritable.class);
     job.setOutputValueClass(Put.class);
     if (partitioner == HRegionPartitioner.class) {
       job.setPartitionerClass(HRegionPartitioner.class);
-      HTable outputTable = new HTable(new HBaseConfiguration(job), table);
+      HTable outputTable = new HTable(new HBaseConfiguration(
+        job.getConfiguration()), table);
       int regions = outputTable.getRegionsInfo().size();
       if (job.getNumReduceTasks() > regions) {
         job.setNumReduceTasks(outputTable.getRegionsInfo().size());
@@ -111,73 +147,45 @@
    * configuration does not exceed the number of regions for the given table. 
    * 
    * @param table  The table to get the region count for.
-   * @param job  The current job configuration to adjust.
+   * @param job  The current job to adjust.
    * @throws IOException When retrieving the table details fails.
    */
-  public static void limitNumReduceTasks(String table, JobConf job) 
+  public static void limitNumReduceTasks(String table, Job job) 
   throws IOException { 
-    HTable outputTable = new HTable(new HBaseConfiguration(job), table);
+    HTable outputTable = new HTable(new HBaseConfiguration(
+      job.getConfiguration()), table);
     int regions = outputTable.getRegionsInfo().size();
     if (job.getNumReduceTasks() > regions)
       job.setNumReduceTasks(regions);
   }
 
   /**
-   * Ensures that the given number of map tasks for the given job 
-   * configuration does not exceed the number of regions for the given table. 
-   * 
-   * @param table  The table to get the region count for.
-   * @param job  The current job configuration to adjust.
-   * @throws IOException When retrieving the table details fails.
-   */
-  public static void limitNumMapTasks(String table, JobConf job) 
-  throws IOException { 
-    HTable outputTable = new HTable(new HBaseConfiguration(job), table);
-    int regions = outputTable.getRegionsInfo().size();
-    if (job.getNumMapTasks() > regions)
-      job.setNumMapTasks(regions);
-  }
-
-  /**
    * Sets the number of reduce tasks for the given job configuration to the 
    * number of regions the given table has. 
    * 
    * @param table  The table to get the region count for.
-   * @param job  The current job configuration to adjust.
+   * @param job  The current job to adjust.
    * @throws IOException When retrieving the table details fails.
    */
-  public static void setNumReduceTasks(String table, JobConf job) 
+  public static void setNumReduceTasks(String table, Job job) 
   throws IOException { 
-    HTable outputTable = new HTable(new HBaseConfiguration(job), table);
+    HTable outputTable = new HTable(new HBaseConfiguration(
+      job.getConfiguration()), table);
     int regions = outputTable.getRegionsInfo().size();
     job.setNumReduceTasks(regions);
   }
   
   /**
-   * Sets the number of map tasks for the given job configuration to the 
-   * number of regions the given table has. 
-   * 
-   * @param table  The table to get the region count for.
-   * @param job  The current job configuration to adjust.
-   * @throws IOException When retrieving the table details fails.
-   */
-  public static void setNumMapTasks(String table, JobConf job) 
-  throws IOException { 
-    HTable outputTable = new HTable(new HBaseConfiguration(job), table);
-    int regions = outputTable.getRegionsInfo().size();
-    job.setNumMapTasks(regions);
-  }
-  
-  /**
    * Sets the number of rows to return and cache with each scanner iteration.
    * Higher caching values will enable faster mapreduce jobs at the expense of
    * requiring more heap to contain the cached rows.
    * 
-   * @param job The current job configuration to adjust.
+   * @param job The current job to adjust.
    * @param batchSize The number of rows to return in batch with each scanner
    * iteration.
    */
-  public static void setScannerCaching(JobConf job, int batchSize) {
-    job.setInt("hbase.client.scanner.caching", batchSize);
+  public static void setScannerCaching(Job job, int batchSize) {
+    job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
   }
+  
 }
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java Tue Jun 30 17:42:53 2009
@@ -23,68 +23,89 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapred.InvalidJobConfException;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 /**
- * Convert Map/Reduce output and write it to an HBase table
+ * Convert Map/Reduce output and write it to an HBase table.
  */
 public class TableOutputFormat extends
     FileOutputFormat<ImmutableBytesWritable, Put> {
 
-  /** JobConf parameter that specifies the output table */
-  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
   private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
+  /** Job parameter that specifies the output table. */
+  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
 
   /**
    * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) 
    * and write to an HBase table
    */
   protected static class TableRecordWriter
-    implements RecordWriter<ImmutableBytesWritable, Put> {
-    private HTable m_table;
+    extends RecordWriter<ImmutableBytesWritable, Put> {
+    
+    /** The table to write to. */
+    private HTable table;
 
     /**
      * Instantiate a TableRecordWriter with the HBase HClient for writing.
      * 
-     * @param table
+     * @param table  The table to write to.
      */
     public TableRecordWriter(HTable table) {
-      m_table = table;
+      this.table = table;
     }
 
-    public void close(Reporter reporter) 
-      throws IOException {
-      m_table.flushCommits();
+    /**
+     * Closes the writer, in this case flush table commits.
+     * 
+     * @param context  The context.
+     * @throws IOException When closing the writer fails.
+     * @see org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
+     */
+    @Override
+    public void close(TaskAttemptContext context) 
+    throws IOException {
+      table.flushCommits();
     }
 
-    public void write(ImmutableBytesWritable key,
-        Put value) throws IOException {
-      m_table.put(new Put(value));
+    /**
+     * Writes a key/value pair into the table.
+     * 
+     * @param key  The key.
+     * @param value  The value.
+     * @throws IOException When writing fails.
+     * @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object)
+     */
+    @Override
+    public void write(ImmutableBytesWritable key, Put value) 
+    throws IOException {
+      table.put(new Put(value));
     }
   }
   
-  @Override
-  @SuppressWarnings("unchecked")
-  public RecordWriter getRecordWriter(FileSystem ignored,
-      JobConf job, String name, Progressable progress) throws IOException {
-    
+  /**
+   * Creates a new record writer.
+   * 
+   * @param context  The current task context.
+   * @return The newly created writer instance.
+   * @throws IOException When creating the writer fails.
+   * @throws InterruptedException When the jobs is cancelled.
+   * @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
+   */
+  public RecordWriter<ImmutableBytesWritable, Put> getRecordWriter(
+    TaskAttemptContext context) 
+  throws IOException, InterruptedException {
     // expecting exactly one path
-    
-    String tableName = job.get(OUTPUT_TABLE);
+    String tableName = context.getConfiguration().get(OUTPUT_TABLE);
     HTable table = null;
     try {
-      table = new HTable(new HBaseConfiguration(job), tableName);
+      table = new HTable(new HBaseConfiguration(context.getConfiguration()), 
+        tableName);
     } catch(IOException e) {
       LOG.error(e);
       throw e;
@@ -92,14 +113,5 @@
     table.setAutoFlush(false);
     return new TableRecordWriter(table);
   }
-
-  @Override
-  public void checkOutputSpecs(FileSystem ignored, JobConf job)
-  throws FileAlreadyExistsException, InvalidJobConfException, IOException {
-    
-    String tableName = job.get(OUTPUT_TABLE);
-    if(tableName == null) {
-      throw new IOException("Must specify table name");
-    }
-  }
+  
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableReduce.java?rev=789846&r1=789845&r2=789846&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableReduce.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableReduce.java Tue Jun 30 17:42:53 2009
@@ -1,38 +0,0 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.Reducer;
-
-/**
- * Write a table, sorting by the input key
- *
- * @param <K> key class
- * @param <V> value class
- */
-@SuppressWarnings("unchecked")
-public interface TableReduce<K extends WritableComparable, V extends Writable>
-extends Reducer<K, V, ImmutableBytesWritable, Put> {
-
-}
\ No newline at end of file



Mime
View raw message