hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject svn commit: r899845 [1/2] - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/mapred/ src/java/package-info/ src/test/org/apache/hadoop/hbase/mapred/
Date Sat, 16 Jan 2010 00:02:11 GMT
Author: jdcryans
Date: Sat Jan 16 00:02:10 2010
New Revision: 899845

URL: http://svn.apache.org/viewvc?rev=899845&view=rev
Log:
HBASE-2136  Forward-port the old mapred package

Added:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/Driver.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter_Counters.properties
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMap.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/package-info.java
    hadoop/hbase/trunk/src/java/package-info/
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=899845&r1=899844&r2=899845&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Sat Jan 16 00:02:10 2010
@@ -304,6 +304,7 @@
    HBASE-2107  Upgrading Lucene 2.2 to Lucene 3.0.0 (Kay Kay via Stack)
    HBASE-2111  Move to ivy broke our being able to run in-place; i.e.
                ./bin/start-hbase.sh in a checkout
+   HBASE-2136  Forward-port the old mapred package
 
   NEW FEATURES
    HBASE-1961  HBase EC2 scripts

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,206 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Example table column indexing class.  Runs a mapreduce job to index
+ * specified table columns.
+ * <ul><li>Each row is modeled as a Lucene document: row key is indexed in
+ * its untokenized form, column name-value pairs are Lucene field name-value
+ * pairs.</li>
+ * <li>A file passed on command line is used to populate an
+ * {@link IndexConfiguration} which is used to set various Lucene parameters,
+ * specify whether to optimize an index and which columns to index and/or
+ * store, in tokenized or untokenized form, etc. For an example, see the
+ * <code>createIndexConfContent</code> method in TestTableIndex
+ * </li>
+ * <li>The number of reduce tasks decides the number of indexes (partitions).
+ * The index(es) is stored in the output path of job configuration.</li>
+ * <li>The index build process is done in the reduce phase. Users can use
+ * the map phase to join rows from different tables or to pre-parse/analyze
+ * column content, etc.</li>
+ * </ul>
+ */
+@Deprecated
+public class BuildTableIndex {
+  private static final String USAGE = "Usage: BuildTableIndex " +
+    "-m <numMapTasks> -r <numReduceTasks>\n  -indexConf <iconfFile> " +
+    "-indexDir <indexDir>\n  -table <tableName> -columns <columnName1> " +
+    "[<columnName2> ...]";
+
+  private static void printUsage(String message) {
+    System.err.println(message);
+    System.err.println(USAGE);
+    System.exit(-1);
+  }
+
+  /** default constructor */
+  public BuildTableIndex() {
+    super();
+  }
+
+  /**
+   * @param args
+   * @throws IOException
+   */
+  public void run(String[] args) throws IOException {
+    if (args.length < 6) {
+      printUsage("Too few arguments");
+    }
+
+    int numMapTasks = 1;
+    int numReduceTasks = 1;
+    String iconfFile = null;
+    String indexDir = null;
+    String tableName = null;
+    StringBuffer columnNames = null;
+
+    // parse args
+    for (int i = 0; i < args.length - 1; i++) {
+      if ("-m".equals(args[i])) {
+        numMapTasks = Integer.parseInt(args[++i]);
+      } else if ("-r".equals(args[i])) {
+        numReduceTasks = Integer.parseInt(args[++i]);
+      } else if ("-indexConf".equals(args[i])) {
+        iconfFile = args[++i];
+      } else if ("-indexDir".equals(args[i])) {
+        indexDir = args[++i];
+      } else if ("-table".equals(args[i])) {
+        tableName = args[++i];
+      } else if ("-columns".equals(args[i])) {
+        columnNames = new StringBuffer(args[++i]);
+        while (i + 1 < args.length && !args[i + 1].startsWith("-")) {
+          columnNames.append(" ");
+          columnNames.append(args[++i]);
+        }
+      } else {
+        printUsage("Unsupported option " + args[i]);
+      }
+    }
+
+    if (indexDir == null || tableName == null || columnNames == null) {
+      printUsage("Index directory, table name and at least one column must " +
+        "be specified");
+    }
+
+    Configuration conf = new HBaseConfiguration();
+    if (iconfFile != null) {
+      // set index configuration content from a file
+      String content = readContent(iconfFile);
+      IndexConfiguration iconf = new IndexConfiguration();
+      // purely to validate, exception will be thrown if not valid
+      iconf.addFromXML(content);
+      conf.set("hbase.index.conf", content);
+    }
+
+    if (columnNames != null) {
+      JobConf jobConf = createJob(conf, numMapTasks, numReduceTasks, indexDir,
+          tableName, columnNames.toString());
+      JobClient.runJob(jobConf);
+    }
+  }
+
+  /**
+   * @param conf
+   * @param numMapTasks
+   * @param numReduceTasks
+   * @param indexDir
+   * @param tableName
+   * @param columnNames
+   * @return JobConf
+   */
+  public JobConf createJob(Configuration conf, int numMapTasks,
+      int numReduceTasks, String indexDir, String tableName,
+      String columnNames) {
+    JobConf jobConf = new JobConf(conf, BuildTableIndex.class);
+    jobConf.setJobName("build index for table " + tableName);
+    jobConf.setNumMapTasks(numMapTasks);
+    // number of indexes to partition into
+    jobConf.setNumReduceTasks(numReduceTasks);
+
+    // use identity map (a waste, but just as an example)
+    IdentityTableMap.initJob(tableName, columnNames, IdentityTableMap.class,
+        jobConf);
+
+    // use IndexTableReduce to build a Lucene index
+    jobConf.setReducerClass(IndexTableReduce.class);
+    FileOutputFormat.setOutputPath(jobConf, new Path(indexDir));
+    jobConf.setOutputFormat(IndexOutputFormat.class);
+
+    return jobConf;
+  }
+
+  /*
+   * Read xml file of indexing configurations.  The xml format is similar to
+   * hbase-default.xml and hadoop-default.xml. For an example configuration,
+   * see the <code>createIndexConfContent</code> method in TestTableIndex
+   * @param fileName File to read.
+   * @return XML configuration read from file
+   * @throws IOException
+   */
+  private String readContent(String fileName) throws IOException {
+    File file = new File(fileName);
+    int length = (int) file.length();
+    if (length == 0) {
+      printUsage("Index configuration file " + fileName + " does not exist");
+    }
+
+    int bytesRead = 0;
+    byte[] bytes = new byte[length];
+    FileInputStream fis = new FileInputStream(file);
+
+    try {
+      // read entire file into content
+      while (bytesRead < length) {
+        int read = fis.read(bytes, bytesRead, length - bytesRead);
+        if (read > 0) {
+          bytesRead += read;
+        } else {
+          break;
+        }
+      }
+    } finally {
+      fis.close();
+    }
+
+    return new String(bytes, 0, bytesRead, HConstants.UTF8_ENCODING);
+  }
+
+  /**
+   * @param args
+   * @throws IOException
+   */
+  public static void main(String[] args) throws IOException {
+    BuildTableIndex build = new BuildTableIndex();
+    build.run(args);
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/Driver.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/Driver.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/Driver.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/Driver.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,40 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.util.ProgramDriver;
+
+/**
+ * Driver for hbase mapreduce jobs. Select which to run by passing
+ * name of job to this main.
+ */
+@Deprecated
+public class Driver {
+  /**
+   * @param args
+   * @throws Throwable
+   */
+  public static void main(String[] args) throws Throwable {
+    ProgramDriver pgd = new ProgramDriver();
+    pgd.addClass(RowCounter.NAME, RowCounter.class,
+      "Count rows in HBase table");
+    pgd.driver(args);
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,162 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+
+/**
+ * Extract grouping columns from input record
+ */
+@Deprecated
+public class GroupingTableMap
+extends MapReduceBase
+implements TableMap<ImmutableBytesWritable,Result> {
+
+  /**
+   * JobConf parameter to specify the columns used to produce the key passed to
+   * collect from the map phase
+   */
+  public static final String GROUP_COLUMNS =
+    "hbase.mapred.groupingtablemap.columns";
+
+  protected byte [][] columns;
+
+  /**
+   * Use this before submitting a TableMap job. It will appropriately set up the
+   * JobConf.
+   *
+   * @param table table to be processed
+   * @param columns space separated list of columns to fetch
+   * @param groupColumns space separated list of columns used to form the key
+   * used in collect
+   * @param mapper map class
+   * @param job job configuration object
+   */
+  @SuppressWarnings("unchecked")
+  public static void initJob(String table, String columns, String groupColumns,
+    Class<? extends TableMap> mapper, JobConf job) {
+
+    TableMapReduceUtil.initTableMapJob(table, columns, mapper,
+        ImmutableBytesWritable.class, Result.class, job);
+    job.set(GROUP_COLUMNS, groupColumns);
+  }
+
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    String[] cols = job.get(GROUP_COLUMNS, "").split(" ");
+    columns = new byte[cols.length][];
+    for(int i = 0; i < cols.length; i++) {
+      columns[i] = Bytes.toBytes(cols[i]);
+    }
+  }
+
+  /**
+   * Extract the grouping columns from value to construct a new key.
+   *
+   * Pass the new key and value to reduce.
+   * If any of the grouping columns are not found in the value, the record is skipped.
+   * @param key
+   * @param value
+   * @param output
+   * @param reporter
+   * @throws IOException
+   */
+  public void map(ImmutableBytesWritable key, Result value,
+      OutputCollector<ImmutableBytesWritable,Result> output,
+      Reporter reporter) throws IOException {
+
+    byte[][] keyVals = extractKeyValues(value);
+    if(keyVals != null) {
+      ImmutableBytesWritable tKey = createGroupKey(keyVals);
+      output.collect(tKey, value);
+    }
+  }
+
+  /**
+   * Extract columns values from the current record. This method returns
+   * null if any of the columns are not found.
+   *
+   * Override this method if you want to deal with nulls differently.
+   *
+   * @param r
+   * @return array of byte values
+   */
+  protected byte[][] extractKeyValues(Result r) {
+    byte[][] keyVals = null;
+    ArrayList<byte[]> foundList = new ArrayList<byte[]>();
+    int numCols = columns.length;
+    if (numCols > 0) {
+      for (KeyValue value: r.list()) {
+        byte [] column = KeyValue.makeColumn(value.getFamily(),
+            value.getQualifier());
+        for (int i = 0; i < numCols; i++) {
+          if (Bytes.equals(column, columns[i])) {
+            foundList.add(value.getValue());
+            break;
+          }
+        }
+      }
+      if(foundList.size() == numCols) {
+        keyVals = foundList.toArray(new byte[numCols][]);
+      }
+    }
+    return keyVals;
+  }
+
+  /**
+   * Create a key by concatenating multiple column values.
+   * Override this function in order to produce different types of keys.
+   *
+   * @param vals
+   * @return key generated by concatenating multiple column values
+   */
+  protected ImmutableBytesWritable createGroupKey(byte[][] vals) {
+    if(vals == null) {
+      return null;
+    }
+    StringBuilder sb =  new StringBuilder();
+    for(int i = 0; i < vals.length; i++) {
+      if(i > 0) {
+        sb.append(" ");
+      }
+      try {
+        sb.append(new String(vals[i], HConstants.UTF8_ENCODING));
+      } catch (UnsupportedEncodingException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return new ImmutableBytesWritable(Bytes.toBytes(sb.toString()));
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,91 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+
+
+/**
+ * This is used to partition the output keys into groups of keys.
+ * Keys are grouped according to the regions that currently exist
+ * so that each reducer fills a single region so load is distributed.
+ *
+ * @param <K2>
+ * @param <V2>
+ */
+@Deprecated
+public class HRegionPartitioner<K2,V2>
+implements Partitioner<ImmutableBytesWritable, V2> {
+  private final Log LOG = LogFactory.getLog(TableInputFormat.class);
+  private HTable table;
+  private byte[][] startKeys;
+
+  public void configure(JobConf job) {
+    try {
+      this.table = new HTable(new HBaseConfiguration(job),
+        job.get(TableOutputFormat.OUTPUT_TABLE));
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+
+    try {
+      this.startKeys = this.table.getStartKeys();
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+  }
+
+  public int getPartition(ImmutableBytesWritable key,
+      V2 value, int numPartitions) {
+    byte[] region = null;
+    // Only one region return 0
+    if (this.startKeys.length == 1){
+      return 0;
+    }
+    try {
+      // Not sure if this is cached after a split so we could have problems
+      // here if a region splits while mapping
+      region = table.getRegionLocation(key.get()).getRegionInfo().getStartKey();
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+    for (int i = 0; i < this.startKeys.length; i++){
+      if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){
+        if (i >= numPartitions-1){
+          // cover if we have less reduces then regions.
+          return (Integer.toString(i).hashCode()
+              & Integer.MAX_VALUE) % numPartitions;
+        }
+        return i;
+      }
+    }
+    // if above fails to find start key that match we need to return something
+    return 0;
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,76 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Pass the given key and record as-is to reduce
+ */
+@Deprecated
+public class IdentityTableMap
+extends MapReduceBase
+implements TableMap<ImmutableBytesWritable, Result> {
+
+  /** constructor */
+  public IdentityTableMap() {
+    super();
+  }
+
+  /**
+   * Use this before submitting a TableMap job. It will
+   * appropriately set up the JobConf.
+   *
+   * @param table table name
+   * @param columns columns to scan
+   * @param mapper mapper class
+   * @param job job configuration
+   */
+  @SuppressWarnings("unchecked")
+  public static void initJob(String table, String columns,
+    Class<? extends TableMap> mapper, JobConf job) {
+    TableMapReduceUtil.initTableMapJob(table, columns, mapper,
+      ImmutableBytesWritable.class,
+      Result.class, job);
+  }
+
+  /**
+   * Pass the key, value to reduce
+   * @param key
+   * @param value
+   * @param output
+   * @param reporter
+   * @throws IOException
+   */
+  public void map(ImmutableBytesWritable key, Result value,
+      OutputCollector<ImmutableBytesWritable,Result> output,
+      Reporter reporter) throws IOException {
+
+    // convert
+    output.collect(key, value);
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,61 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Write to table each key, record pair
+ */
+@Deprecated
+public class IdentityTableReduce
+extends MapReduceBase
+implements TableReduce<ImmutableBytesWritable, Put> {
+  @SuppressWarnings("unused")
+  private static final Log LOG =
+    LogFactory.getLog(IdentityTableReduce.class.getName());
+
+  /**
+   * No aggregation, output pairs of (key, record)
+   * @param key
+   * @param values
+   * @param output
+   * @param reporter
+   * @throws IOException
+   */
+  public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
+      OutputCollector<ImmutableBytesWritable, Put> output,
+      Reporter reporter)
+      throws IOException {
+
+    while(values.hasNext()) {
+      output.collect(key, values.next());
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,423 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.ByteArrayInputStream;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+
+/**
+ * Configuration parameters for building a Lucene index
+ */
+@Deprecated
+public class IndexConfiguration extends Configuration {
+  private static final Log LOG = LogFactory.getLog(IndexConfiguration.class);
+
+  static final String HBASE_COLUMN_NAME = "hbase.column.name";
+  static final String HBASE_COLUMN_STORE = "hbase.column.store";
+  static final String HBASE_COLUMN_INDEX = "hbase.column.index";
+  static final String HBASE_COLUMN_TOKENIZE = "hbase.column.tokenize";
+  static final String HBASE_COLUMN_BOOST = "hbase.column.boost";
+  static final String HBASE_COLUMN_OMIT_NORMS = "hbase.column.omit.norms";
+  static final String HBASE_INDEX_ROWKEY_NAME = "hbase.index.rowkey.name";
+  static final String HBASE_INDEX_ANALYZER_NAME = "hbase.index.analyzer.name";
+  static final String HBASE_INDEX_MAX_BUFFERED_DOCS =
+    "hbase.index.max.buffered.docs";
+  static final String HBASE_INDEX_MAX_BUFFERED_DELS =
+    "hbase.index.max.buffered.dels";
+  static final String HBASE_INDEX_MAX_FIELD_LENGTH =
+    "hbase.index.max.field.length";
+  static final String HBASE_INDEX_MAX_MERGE_DOCS =
+    "hbase.index.max.merge.docs";
+  static final String HBASE_INDEX_MERGE_FACTOR = "hbase.index.merge.factor";
+  // double ramBufferSizeMB;
+  static final String HBASE_INDEX_SIMILARITY_NAME =
+    "hbase.index.similarity.name";
+  static final String HBASE_INDEX_USE_COMPOUND_FILE =
+    "hbase.index.use.compound.file";
+  static final String HBASE_INDEX_OPTIMIZE = "hbase.index.optimize";
+
+  public static class ColumnConf extends Properties {
+
+    private static final long serialVersionUID = 7419012290580607821L;
+
+    boolean getBoolean(String name, boolean defaultValue) {
+      String valueString = getProperty(name);
+      if ("true".equals(valueString))
+        return true;
+      else if ("false".equals(valueString))
+        return false;
+      else
+        return defaultValue;
+    }
+
+    void setBoolean(String name, boolean value) {
+      setProperty(name, Boolean.toString(value));
+    }
+
+    float getFloat(String name, float defaultValue) {
+      String valueString = getProperty(name);
+      if (valueString == null)
+        return defaultValue;
+      try {
+        return Float.parseFloat(valueString);
+      } catch (NumberFormatException e) {
+        return defaultValue;
+      }
+    }
+
+    void setFloat(String name, float value) {
+      setProperty(name, Float.toString(value));
+    }
+  }
+
+  private Map<String, ColumnConf> columnMap =
+    new ConcurrentHashMap<String, ColumnConf>();
+
+  public Iterator<String> columnNameIterator() {
+    return columnMap.keySet().iterator();
+  }
+
+  public boolean isIndex(String columnName) {
+    return getColumn(columnName).getBoolean(HBASE_COLUMN_INDEX, true);
+  }
+
+  public void setIndex(String columnName, boolean index) {
+    getColumn(columnName).setBoolean(HBASE_COLUMN_INDEX, index);
+  }
+
+  public boolean isStore(String columnName) {
+    return getColumn(columnName).getBoolean(HBASE_COLUMN_STORE, false);
+  }
+
+  public void setStore(String columnName, boolean store) {
+    getColumn(columnName).setBoolean(HBASE_COLUMN_STORE, store);
+  }
+
+  public boolean isTokenize(String columnName) {
+    return getColumn(columnName).getBoolean(HBASE_COLUMN_TOKENIZE, true);
+  }
+
+  public void setTokenize(String columnName, boolean tokenize) {
+    getColumn(columnName).setBoolean(HBASE_COLUMN_TOKENIZE, tokenize);
+  }
+
+  public float getBoost(String columnName) {
+    return getColumn(columnName).getFloat(HBASE_COLUMN_BOOST, 1.0f);
+  }
+
+  public void setBoost(String columnName, float boost) {
+    getColumn(columnName).setFloat(HBASE_COLUMN_BOOST, boost);
+  }
+
+  public boolean isOmitNorms(String columnName) {
+    return getColumn(columnName).getBoolean(HBASE_COLUMN_OMIT_NORMS, true);
+  }
+
+  public void setOmitNorms(String columnName, boolean omitNorms) {
+    getColumn(columnName).setBoolean(HBASE_COLUMN_OMIT_NORMS, omitNorms);
+  }
+
+  private ColumnConf getColumn(String columnName) {
+    ColumnConf column = columnMap.get(columnName);
+    if (column == null) {
+      column = new ColumnConf();
+      columnMap.put(columnName, column);
+    }
+    return column;
+  }
+
+  public String getAnalyzerName() {
+    return get(HBASE_INDEX_ANALYZER_NAME,
+        "org.apache.lucene.analysis.standard.StandardAnalyzer");
+  }
+
+  public void setAnalyzerName(String analyzerName) {
+    set(HBASE_INDEX_ANALYZER_NAME, analyzerName);
+  }
+
+  public int getMaxBufferedDeleteTerms() {
+    return getInt(HBASE_INDEX_MAX_BUFFERED_DELS, 1000);
+  }
+
+  public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
+    setInt(HBASE_INDEX_MAX_BUFFERED_DELS, maxBufferedDeleteTerms);
+  }
+
+  public int getMaxBufferedDocs() {
+    return getInt(HBASE_INDEX_MAX_BUFFERED_DOCS, 10);
+  }
+
+  public void setMaxBufferedDocs(int maxBufferedDocs) {
+    setInt(HBASE_INDEX_MAX_BUFFERED_DOCS, maxBufferedDocs);
+  }
+
+  public int getMaxFieldLength() {
+    return getInt(HBASE_INDEX_MAX_FIELD_LENGTH, Integer.MAX_VALUE);
+  }
+
+  public void setMaxFieldLength(int maxFieldLength) {
+    setInt(HBASE_INDEX_MAX_FIELD_LENGTH, maxFieldLength);
+  }
+
+  public int getMaxMergeDocs() {
+    return getInt(HBASE_INDEX_MAX_MERGE_DOCS, Integer.MAX_VALUE);
+  }
+
+  public void setMaxMergeDocs(int maxMergeDocs) {
+    setInt(HBASE_INDEX_MAX_MERGE_DOCS, maxMergeDocs);
+  }
+
+  public int getMergeFactor() {
+    return getInt(HBASE_INDEX_MERGE_FACTOR, 10);
+  }
+
+  public void setMergeFactor(int mergeFactor) {
+    setInt(HBASE_INDEX_MERGE_FACTOR, mergeFactor);
+  }
+
+  public String getRowkeyName() {
+    return get(HBASE_INDEX_ROWKEY_NAME, "ROWKEY");
+  }
+
+  public void setRowkeyName(String rowkeyName) {
+    set(HBASE_INDEX_ROWKEY_NAME, rowkeyName);
+  }
+
+  public String getSimilarityName() {
+    return get(HBASE_INDEX_SIMILARITY_NAME, null);
+  }
+
+  public void setSimilarityName(String similarityName) {
+    set(HBASE_INDEX_SIMILARITY_NAME, similarityName);
+  }
+
+  public boolean isUseCompoundFile() {
+    return getBoolean(HBASE_INDEX_USE_COMPOUND_FILE, false);
+  }
+
+  public void setUseCompoundFile(boolean useCompoundFile) {
+    setBoolean(HBASE_INDEX_USE_COMPOUND_FILE, useCompoundFile);
+  }
+
+  public boolean doOptimize() {
+    return getBoolean(HBASE_INDEX_OPTIMIZE, true);
+  }
+
+  public void setDoOptimize(boolean doOptimize) {
+    setBoolean(HBASE_INDEX_OPTIMIZE, doOptimize);
+  }
+
+  public void addFromXML(String content) {
+    try {
+      DocumentBuilder builder = DocumentBuilderFactory.newInstance()
+          .newDocumentBuilder();
+
+      Document doc = builder
+          .parse(new ByteArrayInputStream(content.getBytes()));
+
+      Element root = doc.getDocumentElement();
+      if (!"configuration".equals(root.getTagName())) {
+        LOG.fatal("bad conf file: top-level element not <configuration>");
+      }
+
+      NodeList props = root.getChildNodes();
+      for (int i = 0; i < props.getLength(); i++) {
+        Node propNode = props.item(i);
+        if (!(propNode instanceof Element)) {
+          continue;
+        }
+
+        Element prop = (Element) propNode;
+        if ("property".equals(prop.getTagName())) {
+          propertyFromXML(prop, null);
+        } else if ("column".equals(prop.getTagName())) {
+          columnConfFromXML(prop);
+        } else {
+          LOG.warn("bad conf content: element neither <property> nor <column>");
+        }
+      }
+    } catch (Exception e) {
+      LOG.fatal("error parsing conf content: " + e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void propertyFromXML(Element prop, Properties properties) {
+    NodeList fields = prop.getChildNodes();
+    String attr = null;
+    String value = null;
+
+    for (int j = 0; j < fields.getLength(); j++) {
+      Node fieldNode = fields.item(j);
+      if (!(fieldNode instanceof Element)) {
+        continue;
+      }
+
+      Element field = (Element) fieldNode;
+      if ("name".equals(field.getTagName())) {
+        attr = ((Text) field.getFirstChild()).getData();
+      }
+      if ("value".equals(field.getTagName()) && field.hasChildNodes()) {
+        value = ((Text) field.getFirstChild()).getData();
+      }
+    }
+
+    if (attr != null && value != null) {
+      if (properties == null) {
+        set(attr, value);
+      } else {
+        properties.setProperty(attr, value);
+      }
+    }
+  }
+
+  private void columnConfFromXML(Element column) {
+    ColumnConf columnConf = new ColumnConf();
+    NodeList props = column.getChildNodes();
+    for (int i = 0; i < props.getLength(); i++) {
+      Node propNode = props.item(i);
+      if (!(propNode instanceof Element)) {
+        continue;
+      }
+
+      Element prop = (Element) propNode;
+      if ("property".equals(prop.getTagName())) {
+        propertyFromXML(prop, columnConf);
+      } else {
+        LOG.warn("bad conf content: element not <property>");
+      }
+    }
+
+    if (columnConf.getProperty(HBASE_COLUMN_NAME) != null) {
+      columnMap.put(columnConf.getProperty(HBASE_COLUMN_NAME), columnConf);
+    } else {
+      LOG.warn("bad column conf: name not specified");
+    }
+  }
+
+  public void write(OutputStream out) {
+    try {
+      Document doc = writeDocument();
+      DOMSource source = new DOMSource(doc);
+      StreamResult result = new StreamResult(out);
+      TransformerFactory transFactory = TransformerFactory.newInstance();
+      Transformer transformer = transFactory.newTransformer();
+      transformer.transform(source, result);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Document writeDocument() {
+    Iterator<Map.Entry<String, String>> iter = iterator();
+    try {
+      Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder()
+          .newDocument();
+      Element conf = doc.createElement("configuration");
+      doc.appendChild(conf);
+      conf.appendChild(doc.createTextNode("\n"));
+
+      Map.Entry<String, String> entry;
+      while (iter.hasNext()) {
+        entry = iter.next();
+        String name = entry.getKey();
+        String value = entry.getValue();
+        writeProperty(doc, conf, name, value);
+      }
+
+      Iterator<String> columnIter = columnNameIterator();
+      while (columnIter.hasNext()) {
+        writeColumn(doc, conf, columnIter.next());
+      }
+
+      return doc;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void writeProperty(Document doc, Element parent, String name,
+      String value) {
+    Element propNode = doc.createElement("property");
+    parent.appendChild(propNode);
+
+    Element nameNode = doc.createElement("name");
+    nameNode.appendChild(doc.createTextNode(name));
+    propNode.appendChild(nameNode);
+
+    Element valueNode = doc.createElement("value");
+    valueNode.appendChild(doc.createTextNode(value));
+    propNode.appendChild(valueNode);
+
+    parent.appendChild(doc.createTextNode("\n"));
+  }
+
+  private void writeColumn(Document doc, Element parent, String columnName) {
+    Element column = doc.createElement("column");
+    parent.appendChild(column);
+    column.appendChild(doc.createTextNode("\n"));
+
+    ColumnConf columnConf = getColumn(columnName);
+    for (Map.Entry<Object, Object> entry : columnConf.entrySet()) {
+      if (entry.getKey() instanceof String
+          && entry.getValue() instanceof String) {
+        writeProperty(doc, column, (String) entry.getKey(), (String) entry
+            .getValue());
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringWriter writer = new StringWriter();
+    try {
+      Document doc = writeDocument();
+      DOMSource source = new DOMSource(doc);
+      StreamResult result = new StreamResult(writer);
+      TransformerFactory transFactory = TransformerFactory.newInstance();
+      Transformer transformer = transFactory.newTransformer();
+      transformer.transform(source, result);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return writer.toString();
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,166 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.io.File;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.search.Similarity;
+import org.apache.lucene.store.FSDirectory;
+
+/**
+ * Create a local index, unwrap Lucene documents created by reduce, add them to
+ * the index, and copy the index to the destination.
+ */
+@Deprecated
+public class IndexOutputFormat extends
+    FileOutputFormat<ImmutableBytesWritable, LuceneDocumentWrapper> {
+  static final Log LOG = LogFactory.getLog(IndexOutputFormat.class);
+
+  private Random random = new Random();
+
+  @Override
+  public RecordWriter<ImmutableBytesWritable, LuceneDocumentWrapper>
+  getRecordWriter(final FileSystem fs, JobConf job, String name,
+      final Progressable progress)
+  throws IOException {
+
+    final Path perm = new Path(FileOutputFormat.getOutputPath(job), name);
+    final Path temp = job.getLocalPath("index/_"
+        + Integer.toString(random.nextInt()));
+
+    LOG.info("To index into " + perm);
+
+    // delete old, if any
+    fs.delete(perm, true);
+
+    final IndexConfiguration indexConf = new IndexConfiguration();
+    String content = job.get("hbase.index.conf");
+    if (content != null) {
+      indexConf.addFromXML(content);
+    }
+
+    String analyzerName = indexConf.getAnalyzerName();
+    Analyzer analyzer;
+    try {
+      Class<?> analyzerClass = Class.forName(analyzerName);
+      analyzer = (Analyzer) analyzerClass.newInstance();
+    } catch (Exception e) {
+      throw new IOException("Error in creating an analyzer object "
+          + analyzerName);
+    }
+
+    // build locally first
+    final IndexWriter writer = new IndexWriter(FSDirectory.open(new File(fs.startLocalOutput(perm, temp)
+        .toString())), analyzer, true, IndexWriter.MaxFieldLength.LIMITED);
+
+    // no delete, so no need for maxBufferedDeleteTerms
+    writer.setMaxBufferedDocs(indexConf.getMaxBufferedDocs());
+    writer.setMaxFieldLength(indexConf.getMaxFieldLength());
+    writer.setMaxMergeDocs(indexConf.getMaxMergeDocs());
+    writer.setMergeFactor(indexConf.getMergeFactor());
+    String similarityName = indexConf.getSimilarityName();
+    if (similarityName != null) {
+      try {
+        Class<?> similarityClass = Class.forName(similarityName);
+        Similarity similarity = (Similarity) similarityClass.newInstance();
+        writer.setSimilarity(similarity);
+      } catch (Exception e) {
+        throw new IOException("Error in creating a similarty object "
+            + similarityName);
+      }
+    }
+    writer.setUseCompoundFile(indexConf.isUseCompoundFile());
+
+    return new RecordWriter<ImmutableBytesWritable, LuceneDocumentWrapper>() {
+      boolean closed;
+      private long docCount = 0;
+
+      public void write(ImmutableBytesWritable key,
+          LuceneDocumentWrapper value)
+      throws IOException {
+        // unwrap and index doc
+        Document doc = value.get();
+        writer.addDocument(doc);
+        docCount++;
+        progress.progress();
+      }
+
+      public void close(final Reporter reporter) throws IOException {
+        // spawn a thread to give progress heartbeats
+        Thread prog = new Thread() {
+          @Override
+          public void run() {
+            while (!closed) {
+              try {
+                reporter.setStatus("closing");
+                Thread.sleep(1000);
+              } catch (InterruptedException e) {
+                continue;
+              } catch (Throwable e) {
+                return;
+              }
+            }
+          }
+        };
+
+        try {
+          prog.start();
+
+          // optimize index
+          if (indexConf.doOptimize()) {
+            if (LOG.isInfoEnabled()) {
+              LOG.info("Optimizing index.");
+            }
+            writer.optimize();
+          }
+
+          // close index
+          writer.close();
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Done indexing " + docCount + " docs.");
+          }
+
+          // copy to perm destination in dfs
+          fs.completeLocalOutput(perm, temp);
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Copy done.");
+          }
+        } finally {
+          closed = true;
+        }
+      }
+    };
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,108 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Construct a Lucene document per row, which is consumed by IndexOutputFormat
+ * to build a Lucene index
+ */
+@Deprecated
+public class IndexTableReduce extends MapReduceBase implements
+    Reducer<ImmutableBytesWritable, Result, ImmutableBytesWritable, LuceneDocumentWrapper> {
+  private static final Log LOG = LogFactory.getLog(IndexTableReduce.class);
+  private IndexConfiguration indexConf;
+
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    indexConf = new IndexConfiguration();
+    String content = job.get("hbase.index.conf");
+    if (content != null) {
+      indexConf.addFromXML(content);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Index conf: " + indexConf);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+  }
+
+  public void reduce(ImmutableBytesWritable key, Iterator<Result> values,
+      OutputCollector<ImmutableBytesWritable, LuceneDocumentWrapper> output,
+      Reporter reporter)
+  throws IOException {
+    Document doc = null;
+    while(values.hasNext()) {
+      Result r = values.next();
+      if (doc == null) {
+        doc = new Document();
+        // index and store row key, row key already UTF-8 encoded
+        Field keyField = new Field(indexConf.getRowkeyName(),
+          Bytes.toString(key.get(), key.getOffset(), key.getLength()),
+          Field.Store.YES, Field.Index.NOT_ANALYZED);
+        keyField.setOmitNorms(true);
+        doc.add(keyField);
+      }
+      // each column (name-value pair) is a field (name-value pair)
+      for (KeyValue kv: r.list()) {
+        // name is already UTF-8 encoded
+        String column = Bytes.toString(KeyValue.makeColumn(kv.getFamily(),
+            kv.getQualifier()));
+        byte[] columnValue = kv.getValue();
+        Field.Store store = indexConf.isStore(column)?
+          Field.Store.YES: Field.Store.NO;
+        Field.Index index = indexConf.isIndex(column)?
+          (indexConf.isTokenize(column)?
+            Field.Index.ANALYZED: Field.Index.NOT_ANALYZED):
+            Field.Index.NO;
+
+        // UTF-8 encode value
+        Field field = new Field(column, Bytes.toString(columnValue),
+          store, index);
+        field.setBoost(indexConf.getBoost(column));
+        field.setOmitNorms(indexConf.isOmitNorms(column));
+
+        doc.add(field);
+      }
+    }
+    output.collect(key, new LuceneDocumentWrapper(doc));
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,56 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import org.apache.hadoop.io.Writable;
+import org.apache.lucene.document.Document;
+
+/**
+ * A utility class used to pass a lucene document from reduce to OutputFormat.
+ * It doesn't really serialize/deserialize a lucene document.
+ */
+@Deprecated
+public class LuceneDocumentWrapper implements Writable {
+  protected Document doc;
+
+  /**
+   * @param doc
+   */
+  public LuceneDocumentWrapper(Document doc) {
+    this.doc = doc;
+  }
+
+  /**
+   * @return the document
+   */
+  public Document get() {
+    return doc;
+  }
+
+  public void readFields(DataInput in) {
+    // intentionally left blank
+  }
+
+  public void write(DataOutput out) {
+    // intentionally left blank
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,137 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A job with a map to count rows.
+ * Map outputs table rows IF the input row has columns that have content.
+ * Uses an {@link IdentityReducer}
+ */
+@Deprecated
+public class RowCounter extends Configured implements Tool {
+  // Name of this 'program'
+  static final String NAME = "rowcounter";
+
+  /**
+   * Mapper that runs the count.
+   */
+  static class RowCounterMapper
+  implements TableMap<ImmutableBytesWritable, Result> {
+    private static enum Counters {ROWS}
+
+    public void map(ImmutableBytesWritable row, Result values,
+        OutputCollector<ImmutableBytesWritable, Result> output,
+        Reporter reporter)
+    throws IOException {
+      boolean content = false;
+
+      for (KeyValue value: values.list()) {
+        if (value.getValue().length > 0) {
+          content = true;
+          break;
+        }
+      }
+      if (!content) {
+        // Don't count rows that are all empty values.
+        return;
+      }
+      // Give out same value every time.  We're only interested in the row/key
+      reporter.incrCounter(Counters.ROWS, 1);
+    }
+
+    public void configure(JobConf jc) {
+      // Nothing to do.
+    }
+
+    public void close() throws IOException {
+      // Nothing to do.
+    }
+  }
+
+  /**
+   * @param args
+   * @return the JobConf
+   * @throws IOException
+   */
+  public JobConf createSubmittableJob(String[] args) throws IOException {
+    JobConf c = new JobConf(getConf(), getClass());
+    c.setJobName(NAME);
+    // Columns are space delimited
+    StringBuilder sb = new StringBuilder();
+    final int columnoffset = 2;
+    for (int i = columnoffset; i < args.length; i++) {
+      if (i > columnoffset) {
+        sb.append(" ");
+      }
+      sb.append(args[i]);
+    }
+    // Second argument is the table name.
+    TableMapReduceUtil.initTableMapJob(args[1], sb.toString(),
+      RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, c);
+    c.setNumReduceTasks(0);
+    // First arg is the output directory.
+    FileOutputFormat.setOutputPath(c, new Path(args[0]));
+    return c;
+  }
+
+  static int printUsage() {
+    System.out.println(NAME +
+      " <outputdir> <tablename> <column1> [<column2>...]");
+    return -1;
+  }
+
+  public int run(final String[] args) throws Exception {
+    // Make sure there are at least 3 parameters
+    if (args.length < 3) {
+      System.err.println("ERROR: Wrong number of parameters: " + args.length);
+      return printUsage();
+    }
+    JobClient.runJob(createSubmittableJob(args));
+    return 0;
+  }
+
+  /**
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    HBaseConfiguration c = new HBaseConfiguration();
+    int errCode = ToolRunner.run(c, new RowCounter(), args);
+    System.exit(errCode);
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter_Counters.properties
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter_Counters.properties?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter_Counters.properties (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter_Counters.properties Sat Jan 16 00:02:10 2010
@@ -0,0 +1,6 @@
+
+# ResourceBundle properties file for RowCounter MR job
+
+CounterGroupName=         RowCounter
+
+ROWS.name=                Rows
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,83 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Convert HBase tabular data into a format that is consumable by Map/Reduce.
+ */
+@Deprecated
+public class TableInputFormat extends TableInputFormatBase implements
+    JobConfigurable {
+  private final Log LOG = LogFactory.getLog(TableInputFormat.class);
+
+  /**
+   * space delimited list of columns
+   */
+  public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
+
+  public void configure(JobConf job) {
+    Path[] tableNames = FileInputFormat.getInputPaths(job);
+    String colArg = job.get(COLUMN_LIST);
+    String[] colNames = colArg.split(" ");
+    byte [][] m_cols = new byte[colNames.length][];
+    for (int i = 0; i < m_cols.length; i++) {
+      m_cols[i] = Bytes.toBytes(colNames[i]);
+    }
+    setInputColumns(m_cols);
+    try {
+      setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
+    } catch (Exception e) {
+      LOG.error(StringUtils.stringifyException(e));
+    }
+  }
+
+  public void validateInput(JobConf job) throws IOException {
+    // expecting exactly one path
+    Path [] tableNames = FileInputFormat.getInputPaths(job);
+    if (tableNames == null || tableNames.length > 1) {
+      throw new IOException("expecting one table name");
+    }
+
+    // connected to table?
+    if (getHTable() == null) {
+      throw new IOException("could not connect to table '" +
+        tableNames[0].getName() + "'");
+    }
+
+    // expecting at least one column
+    String colArg = job.get(COLUMN_LIST);
+    if (colArg == null || colArg.length() == 0) {
+      throw new IOException("expecting at least one column");
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,347 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
+ * byte[] of input columns and optionally a {@link Filter}.
+ * Subclasses may use other TableRecordReader implementations.
+ * <p>
+ * An example of a subclass:
+ * <pre>
+ *   class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
+ *
+ *     public void configure(JobConf job) {
+ *       HTable exampleTable = new HTable(new HBaseConfiguration(job),
+ *         Bytes.toBytes("exampleTable"));
+ *       // mandatory
+ *       setHTable(exampleTable);
+ *       Text[] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ *         Bytes.toBytes("columnB") };
+ *       // mandatory
+ *       setInputColumns(inputColumns);
+ *       RowFilterInterface exampleFilter = new RegExpRowFilter("keyPrefix.*");
+ *       // optional
+ *       setRowFilter(exampleFilter);
+ *     }
+ *
+ *     public void validateInput(JobConf job) throws IOException {
+ *     }
+ *  }
+ * </pre>
+ */
+
+@Deprecated
+public abstract class TableInputFormatBase
+implements InputFormat<ImmutableBytesWritable, Result> {
+  final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
+  private byte [][] inputColumns;
+  private HTable table;
+  private TableRecordReader tableRecordReader;
+  private Filter rowFilter;
+
+  /**
+   * Iterate over an HBase table data, return (Text, RowResult) pairs
+   */
+  protected class TableRecordReader
+  implements RecordReader<ImmutableBytesWritable, Result> {
+    private byte [] startRow;
+    private byte [] endRow;
+    private byte [] lastRow;
+    private Filter trrRowFilter;
+    private ResultScanner scanner;
+    private HTable htable;
+    private byte [][] trrInputColumns;
+
+    /**
+     * Restart from survivable exceptions by creating a new scanner.
+     *
+     * @param firstRow
+     * @throws IOException
+     */
+    public void restart(byte[] firstRow) throws IOException {
+      if ((endRow != null) && (endRow.length > 0)) {
+        if (trrRowFilter != null) {
+          Scan scan = new Scan(firstRow, endRow);
+          scan.addColumns(trrInputColumns);
+          scan.setFilter(trrRowFilter);
+          this.scanner = this.htable.getScanner(scan);
+        } else {
+          LOG.debug("TIFB.restart, firstRow: " +
+              Bytes.toStringBinary(firstRow) + ", endRow: " +
+              Bytes.toStringBinary(endRow));
+          Scan scan = new Scan(firstRow, endRow);
+          scan.addColumns(trrInputColumns);
+          this.scanner = this.htable.getScanner(scan);
+        }
+      } else {
+        LOG.debug("TIFB.restart, firstRow: " +
+            Bytes.toStringBinary(firstRow) + ", no endRow");
+
+        Scan scan = new Scan(firstRow);
+        scan.addColumns(trrInputColumns);
+//        scan.setFilter(trrRowFilter);
+        this.scanner = this.htable.getScanner(scan);
+      }
+    }
+
+    /**
+     * Build the scanner. Not done in constructor to allow for extension.
+     *
+     * @throws IOException
+     */
+    public void init() throws IOException {
+      restart(startRow);
+    }
+
+    /**
+     * @param htable the {@link HTable} to scan.
+     */
+    public void setHTable(HTable htable) {
+      this.htable = htable;
+    }
+
+    /**
+     * @param inputColumns the columns to be placed in {@link Result}.
+     */
+    public void setInputColumns(final byte [][] inputColumns) {
+      this.trrInputColumns = inputColumns;
+    }
+
+    /**
+     * @param startRow the first row in the split
+     */
+    public void setStartRow(final byte [] startRow) {
+      this.startRow = startRow;
+    }
+
+    /**
+     *
+     * @param endRow the last row in the split
+     */
+    public void setEndRow(final byte [] endRow) {
+      this.endRow = endRow;
+    }
+
+    /**
+     * @param rowFilter the {@link Filter} to be used.
+     */
+    public void setRowFilter(Filter rowFilter) {
+      this.trrRowFilter = rowFilter;
+    }
+
+    public void close() {
+      this.scanner.close();
+    }
+
+    /**
+     * @return ImmutableBytesWritable
+     *
+     * @see org.apache.hadoop.mapred.RecordReader#createKey()
+     */
+    public ImmutableBytesWritable createKey() {
+      return new ImmutableBytesWritable();
+    }
+
+    /**
+     * @return RowResult
+     *
+     * @see org.apache.hadoop.mapred.RecordReader#createValue()
+     */
+    public Result createValue() {
+      return new Result();
+    }
+
+    public long getPos() {
+      // This should be the ordinal tuple in the range;
+      // not clear how to calculate...
+      return 0;
+    }
+
+    public float getProgress() {
+      // Depends on the total number of tuples and getPos
+      return 0;
+    }
+
+    /**
+     * @param key HStoreKey as input key.
+     * @param value MapWritable as input value
+     * @return true if there was more data
+     * @throws IOException
+     */
+    public boolean next(ImmutableBytesWritable key, Result value)
+    throws IOException {
+      Result result;
+      try {
+        result = this.scanner.next();
+      } catch (UnknownScannerException e) {
+        LOG.debug("recovered from " + StringUtils.stringifyException(e));
+        restart(lastRow);
+        this.scanner.next();    // skip presumed already mapped row
+        result = this.scanner.next();
+      }
+
+      if (result != null && result.size() > 0) {
+        key.set(result.getRow());
+        lastRow = key.get();
+        Writables.copyWritable(result, value);
+        return true;
+      }
+      return false;
+    }
+  }
+
+  /**
+   * Builds a TableRecordReader. If no TableRecordReader was provided, uses
+   * the default.
+   *
+   * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
+   *      JobConf, Reporter)
+   */
+  public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+      InputSplit split, JobConf job, Reporter reporter)
+  throws IOException {
+    TableSplit tSplit = (TableSplit) split;
+    TableRecordReader trr = this.tableRecordReader;
+    // if no table record reader was provided use default
+    if (trr == null) {
+      trr = new TableRecordReader();
+    }
+    trr.setStartRow(tSplit.getStartRow());
+    trr.setEndRow(tSplit.getEndRow());
+    trr.setHTable(this.table);
+    trr.setInputColumns(this.inputColumns);
+    trr.setRowFilter(this.rowFilter);
+    trr.init();
+    return trr;
+  }
+
+  /**
+   * Calculates the splits that will serve as input for the map tasks.
+   * <ul>
+   * Splits are created in number equal to the smallest between numSplits and
+   * the number of {@link HRegion}s in the table. If the number of splits is
+   * smaller than the number of {@link HRegion}s then splits are spanned across
+   * multiple {@link HRegion}s and are grouped the most evenly possible. In the
+   * case splits are uneven the bigger splits are placed first in the
+   * {@link InputSplit} array.
+   *
+   * @param job the map task {@link JobConf}
+   * @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
+   *
+   * @return the input splits
+   *
+   * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
+   */
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    if (this.table == null) {
+      throw new IOException("No table was provided");
+    }
+    byte [][] startKeys = this.table.getStartKeys();
+    if (startKeys == null || startKeys.length == 0) {
+      throw new IOException("Expecting at least one region");
+    }
+    if (this.inputColumns == null || this.inputColumns.length == 0) {
+      throw new IOException("Expecting at least one column");
+    }
+    int realNumSplits = numSplits > startKeys.length? startKeys.length:
+      numSplits;
+    InputSplit[] splits = new InputSplit[realNumSplits];
+    int middle = startKeys.length / realNumSplits;
+    int startPos = 0;
+    for (int i = 0; i < realNumSplits; i++) {
+      int lastPos = startPos + middle;
+      lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
+      String regionLocation = table.getRegionLocation(startKeys[startPos]).
+        getServerAddress().getHostname();
+      splits[i] = new TableSplit(this.table.getTableName(),
+        startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
+          HConstants.EMPTY_START_ROW, regionLocation);
+      LOG.info("split: " + i + "->" + splits[i]);
+      startPos = lastPos;
+    }
+    return splits;
+  }
+
+  /**
+   * @param inputColumns to be passed in {@link Result} to the map task.
+   */
+  protected void setInputColumns(byte [][] inputColumns) {
+    this.inputColumns = inputColumns;
+  }
+
+  /**
+   * Allows subclasses to get the {@link HTable}.
+   */
+  protected HTable getHTable() {
+    return this.table;
+  }
+
+  /**
+   * Allows subclasses to set the {@link HTable}.
+   *
+   * @param table to get the data from
+   */
+  protected void setHTable(HTable table) {
+    this.table = table;
+  }
+
+  /**
+   * Allows subclasses to set the {@link TableRecordReader}.
+   *
+   * @param tableRecordReader
+   *                to provide other {@link TableRecordReader} implementations.
+   */
+  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+    this.tableRecordReader = tableRecordReader;
+  }
+
+  /**
+   * Allows subclasses to set the {@link Filter} to be used.
+   *
+   * @param rowFilter
+   */
+  protected void setRowFilter(Filter rowFilter) {
+    this.rowFilter = rowFilter;
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMap.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMap.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMap.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,39 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Mapper;
+
+/**
+ * Scan an HBase table to sort by a specified sort column.
+ * If the column does not exist, the record is not passed to Reduce.
+ *
+ * @param <K> WritableComparable key class
+ * @param <V> Writable value class
+ */
+@Deprecated
+public interface TableMap<K extends WritableComparable<? super K>, V extends Writable>
+extends Mapper<ImmutableBytesWritable, Result, K, V> {
+
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java?rev=899845&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java Sat Jan 16 00:02:10 2010
@@ -0,0 +1,184 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Utility for {@link TableMap} and {@link TableReduce}
+ */
+@Deprecated
+@SuppressWarnings("unchecked")
+public class TableMapReduceUtil {
+
+  /**
+   * Use this before submitting a TableMap job. It will
+   * appropriately set up the JobConf.
+   *
+   * @param table  The table name to read from.
+   * @param columns  The columns to scan.
+   * @param mapper  The mapper class to use.
+   * @param outputKeyClass  The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job  The current job configuration to adjust.
+   */
+  public static void initTableMapJob(String table, String columns,
+    Class<? extends TableMap> mapper,
+    Class<? extends WritableComparable> outputKeyClass,
+    Class<? extends Writable> outputValueClass, JobConf job) {
+
+    job.setInputFormat(TableInputFormat.class);
+    job.setMapOutputValueClass(outputValueClass);
+    job.setMapOutputKeyClass(outputKeyClass);
+    job.setMapperClass(mapper);
+    FileInputFormat.addInputPaths(job, table);
+    job.set(TableInputFormat.COLUMN_LIST, columns);
+  }
+
+  /**
+   * Use this before submitting a TableReduce job. It will
+   * appropriately set up the JobConf.
+   *
+   * @param table  The output table.
+   * @param reducer  The reducer class to use.
+   * @param job  The current job configuration to adjust.
+   * @throws IOException When determining the region count fails.
+   */
+  public static void initTableReduceJob(String table,
+    Class<? extends TableReduce> reducer, JobConf job)
+  throws IOException {
+    initTableReduceJob(table, reducer, job, null);
+  }
+
+  /**
+   * Use this before submitting a TableReduce job. It will
+   * appropriately set up the JobConf.
+   *
+   * @param table  The output table.
+   * @param reducer  The reducer class to use.
+   * @param job  The current job configuration to adjust.
+   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
+   * default partitioner.
+   * @throws IOException When determining the region count fails.
+   */
+  public static void initTableReduceJob(String table,
+    Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
+  throws IOException {
+    job.setOutputFormat(TableOutputFormat.class);
+    job.setReducerClass(reducer);
+    job.set(TableOutputFormat.OUTPUT_TABLE, table);
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(Put.class);
+    if (partitioner == HRegionPartitioner.class) {
+      job.setPartitionerClass(HRegionPartitioner.class);
+      HTable outputTable = new HTable(new HBaseConfiguration(job), table);
+      int regions = outputTable.getRegionsInfo().size();
+      if (job.getNumReduceTasks() > regions) {
+        job.setNumReduceTasks(outputTable.getRegionsInfo().size());
+      }
+    } else if (partitioner != null) {
+      job.setPartitionerClass(partitioner);
+    }
+  }
+
+  /**
+   * Ensures that the given number of reduce tasks for the given job
+   * configuration does not exceed the number of regions for the given table.
+   *
+   * @param table  The table to get the region count for.
+   * @param job  The current job configuration to adjust.
+   * @throws IOException When retrieving the table details fails.
+   */
+  public static void limitNumReduceTasks(String table, JobConf job)
+  throws IOException {
+    HTable outputTable = new HTable(new HBaseConfiguration(job), table);
+    int regions = outputTable.getRegionsInfo().size();
+    if (job.getNumReduceTasks() > regions)
+      job.setNumReduceTasks(regions);
+  }
+
+  /**
+   * Ensures that the given number of map tasks for the given job
+   * configuration does not exceed the number of regions for the given table.
+   *
+   * @param table  The table to get the region count for.
+   * @param job  The current job configuration to adjust.
+   * @throws IOException When retrieving the table details fails.
+   */
+  public static void limitNumMapTasks(String table, JobConf job)
+  throws IOException {
+    HTable outputTable = new HTable(new HBaseConfiguration(job), table);
+    int regions = outputTable.getRegionsInfo().size();
+    if (job.getNumMapTasks() > regions)
+      job.setNumMapTasks(regions);
+  }
+
+  /**
+   * Sets the number of reduce tasks for the given job configuration to the
+   * number of regions the given table has.
+   *
+   * @param table  The table to get the region count for.
+   * @param job  The current job configuration to adjust.
+   * @throws IOException When retrieving the table details fails.
+   */
+  public static void setNumReduceTasks(String table, JobConf job)
+  throws IOException {
+    HTable outputTable = new HTable(new HBaseConfiguration(job), table);
+    int regions = outputTable.getRegionsInfo().size();
+    job.setNumReduceTasks(regions);
+  }
+
+  /**
+   * Sets the number of map tasks for the given job configuration to the
+   * number of regions the given table has.
+   *
+   * @param table  The table to get the region count for.
+   * @param job  The current job configuration to adjust.
+   * @throws IOException When retrieving the table details fails.
+   */
+  public static void setNumMapTasks(String table, JobConf job)
+  throws IOException {
+    HTable outputTable = new HTable(new HBaseConfiguration(job), table);
+    int regions = outputTable.getRegionsInfo().size();
+    job.setNumMapTasks(regions);
+  }
+
+  /**
+   * Sets the number of rows to return and cache with each scanner iteration.
+   * Higher caching values will enable faster mapreduce jobs at the expense of
+   * requiring more heap to contain the cached rows.
+   *
+   * @param job The current job configuration to adjust.
+   * @param batchSize The number of rows to return in batch with each scanner
+   * iteration.
+   */
+  public static void setScannerCaching(JobConf job, int batchSize) {
+    job.setInt("hbase.client.scanner.caching", batchSize);
+  }
+}
\ No newline at end of file



Mime
View raw message