hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r580399 - in /lucene/hadoop/trunk/src/contrib/hbase/src: java/org/apache/hadoop/hbase/mapred/ test/org/apache/hadoop/hbase/ test/org/apache/hadoop/hbase/mapred/
Date Fri, 28 Sep 2007 16:12:25 GMT
Author: stack
Date: Fri Sep 28 09:12:24 2007
New Revision: 580399

URL: http://svn.apache.org/viewvc?rev=580399&view=rev
Log:
HADOOP-1913 Build a Lucene index on an HBase table
Files I failed to add/delete on original commit

Added:
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
Removed:
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java?rev=580399&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java Fri Sep 28 09:12:24 2007
@@ -0,0 +1,184 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Example table column indexing class.  Runs a mapreduce job to index
+ * specified table columns.
+ * <ul><li>Each row is modeled as a Lucene document: row key is indexed in
+ * its untokenized form, column name-value pairs are Lucene field name-value 
+ * pairs.</li>
+ * <li>A file passed on command line is used to populate an
+ * {@link IndexConfiguration} which is used to set various Lucene parameters,
+ * specify whether to optimize an index and which columns to index and/or
+ * store, in tokenized or untokenized form, etc. For an example, see the
+ * <code>createIndexConfContent</code> method in TestTableIndex
+ * </li>
+ * <li>The number of reduce tasks decides the number of indexes (partitions).
+ * The index(es) is stored in the output path of job configuration.</li>
+ * <li>The index build process is done in the reduce phase. Users can use
+ * the map phase to join rows from different tables or to pre-parse/analyze
+ * column content, etc.</li>
+ * </ul>
+ */
+public class BuildTableIndex {
+  private static final String USAGE = "Usage: BuildTableIndex " +
+    "-m <numMapTasks> -r <numReduceTasks>\n  -indexConf <iconfFile> " +
+    "-indexDir <indexDir>\n  -table <tableName> -columns <columnName1> " +
+    "[<columnName2> ...]";
+
+  private static void printUsage(String message) {
+    System.err.println(message);
+    System.err.println(USAGE);
+    System.exit(-1);
+  }
+
+  public BuildTableIndex() {
+    super();
+  }
+
+  public void run(String[] args) throws IOException {
+    if (args.length < 6) {
+      printUsage("Too few arguments");
+    }
+
+    int numMapTasks = 1;
+    int numReduceTasks = 1;
+    String iconfFile = null;
+    String indexDir = null;
+    String tableName = null;
+    StringBuffer columnNames = null;
+
+    // parse args
+    for (int i = 0; i < args.length - 1; i++) {
+      if ("-m".equals(args[i])) {
+        numMapTasks = Integer.parseInt(args[++i]);
+      } else if ("-r".equals(args[i])) {
+        numReduceTasks = Integer.parseInt(args[++i]);
+      } else if ("-indexConf".equals(args[i])) {
+        iconfFile = args[++i];
+      } else if ("-indexDir".equals(args[i])) {
+        indexDir = args[++i];
+      } else if ("-table".equals(args[i])) {
+        tableName = args[++i];
+      } else if ("-columns".equals(args[i])) {
+        columnNames = new StringBuffer(args[++i]);
+        while (i + 1 < args.length && !args[i + 1].startsWith("-")) {
+          columnNames.append(" ");
+          columnNames.append(args[++i]);
+        }
+      } else {
+        printUsage("Unsupported option " + args[i]);
+      }
+    }
+
+    if (indexDir == null || tableName == null || columnNames == null) {
+      printUsage("Index directory, table name and at least one column must " +
+        "be specified");
+    }
+
+    Configuration conf = new HBaseConfiguration();
+    if (iconfFile != null) {
+      // set index configuration content from a file
+      String content = readContent(iconfFile);
+      IndexConfiguration iconf = new IndexConfiguration();
+      // purely to validate, exception will be thrown if not valid
+      iconf.addFromXML(content);
+      conf.set("hbase.index.conf", content);
+    }
+
+    JobConf jobConf = createJob(conf, numMapTasks, numReduceTasks, indexDir,
+      tableName, columnNames.toString());
+    JobClient.runJob(jobConf);
+  }
+
+  public JobConf createJob(Configuration conf, int numMapTasks,
+      int numReduceTasks, String indexDir, String tableName,
+      String columnNames) {
+    JobConf jobConf = new JobConf(conf, BuildTableIndex.class);
+    jobConf.setJobName("build index for table " + tableName);
+    jobConf.setNumMapTasks(numMapTasks);
+    // number of indexes to partition into
+    jobConf.setNumReduceTasks(numReduceTasks);
+
+    // use identity map (a waste, but just as an example)
+    IdentityTableMap.initJob(tableName, columnNames, IdentityTableMap.class,
+        jobConf);
+
+    // use IndexTableReduce to build a Lucene index
+    jobConf.setReducerClass(IndexTableReduce.class);
+    jobConf.setOutputPath(new Path(indexDir));
+    jobConf.setOutputFormat(IndexOutputFormat.class);
+
+    return jobConf;
+  }
+
+  /*
+   * Read xml file of indexing configurations.  The xml format is similar to
+   * hbase-default.xml and hadoop-default.xml. For an example configuration,
+   * see the <code>createIndexConfContent</code> method in TestTableIndex
+   * @param fileName File to read.
+   * @return XML configuration read from file
+   * @throws IOException
+   */
+  private String readContent(String fileName) throws IOException {
+    File file = new File(fileName);
+    int length = (int) file.length();
+    if (length == 0) {
+      printUsage("Index configuration file " + fileName + " does not exist");
+    }
+
+    int bytesRead = 0;
+    byte[] bytes = new byte[length];
+    FileInputStream fis = new FileInputStream(file);
+
+    try {
+      // read entire file into content
+      while (bytesRead < length) {
+        int read = fis.read(bytes, bytesRead, length - bytesRead);
+        if (read > 0) {
+          bytesRead += read;
+        } else {
+          break;
+        }
+      }
+    } finally {
+      fis.close();
+    }
+
+    return new String(bytes, 0, bytesRead, HConstants.UTF8_ENCODING);
+  }
+
+  public static void main(String[] args) throws IOException {
+    BuildTableIndex build = new BuildTableIndex();
+    build.run(args);
+  }
+}
\ No newline at end of file

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java?rev=580399&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java Fri Sep 28 09:12:24 2007
@@ -0,0 +1,418 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+
+/**
+ * Configuration parameters for building a Lucene index
+ */
+public class IndexConfiguration extends Configuration {
+  private static final Log LOG = LogFactory.getLog(IndexConfiguration.class);
+
+  static final String HBASE_COLUMN_NAME = "hbase.column.name";
+  static final String HBASE_COLUMN_STORE = "hbase.column.store";
+  static final String HBASE_COLUMN_INDEX = "hbase.column.index";
+  static final String HBASE_COLUMN_TOKENIZE = "hbase.column.tokenize";
+  static final String HBASE_COLUMN_BOOST = "hbase.column.boost";
+  static final String HBASE_COLUMN_OMIT_NORMS = "hbase.column.omit.norms";
+  static final String HBASE_INDEX_ROWKEY_NAME = "hbase.index.rowkey.name";
+  static final String HBASE_INDEX_ANALYZER_NAME = "hbase.index.analyzer.name";
+  static final String HBASE_INDEX_MAX_BUFFERED_DOCS =
+    "hbase.index.max.buffered.docs";
+  static final String HBASE_INDEX_MAX_BUFFERED_DELS =
+    "hbase.index.max.buffered.dels";
+  static final String HBASE_INDEX_MAX_FIELD_LENGTH =
+    "hbase.index.max.field.length";
+  static final String HBASE_INDEX_MAX_MERGE_DOCS =
+    "hbase.index.max.merge.docs";
+  static final String HBASE_INDEX_MERGE_FACTOR = "hbase.index.merge.factor";
+  // double ramBufferSizeMB;
+  static final String HBASE_INDEX_SIMILARITY_NAME =
+    "hbase.index.similarity.name";
+  static final String HBASE_INDEX_USE_COMPOUND_FILE =
+    "hbase.index.use.compound.file";
+  static final String HBASE_INDEX_OPTIMIZE = "hbase.index.optimize";
+
+  public static class ColumnConf extends Properties {
+    boolean getBoolean(String name, boolean defaultValue) {
+      String valueString = getProperty(name);
+      if ("true".equals(valueString))
+        return true;
+      else if ("false".equals(valueString))
+        return false;
+      else
+        return defaultValue;
+    }
+
+    void setBoolean(String name, boolean value) {
+      setProperty(name, Boolean.toString(value));
+    }
+
+    float getFloat(String name, float defaultValue) {
+      String valueString = getProperty(name);
+      if (valueString == null)
+        return defaultValue;
+      try {
+        return Float.parseFloat(valueString);
+      } catch (NumberFormatException e) {
+        return defaultValue;
+      }
+    }
+
+    void setFloat(String name, float value) {
+      setProperty(name, Float.toString(value));
+    }
+  }
+
+  private HashMap<String, ColumnConf> columnMap = new HashMap<String, ColumnConf>();
+
+  public Iterator<String> columnNameIterator() {
+    return columnMap.keySet().iterator();
+  }
+
+  public boolean isIndex(String columnName) {
+    return getColumn(columnName).getBoolean(HBASE_COLUMN_INDEX, true);
+  }
+
+  public void setIndex(String columnName, boolean index) {
+    getColumn(columnName).setBoolean(HBASE_COLUMN_INDEX, index);
+  }
+
+  public boolean isStore(String columnName) {
+    return getColumn(columnName).getBoolean(HBASE_COLUMN_STORE, false);
+  }
+
+  public void setStore(String columnName, boolean store) {
+    getColumn(columnName).setBoolean(HBASE_COLUMN_STORE, store);
+  }
+
+  public boolean isTokenize(String columnName) {
+    return getColumn(columnName).getBoolean(HBASE_COLUMN_TOKENIZE, true);
+  }
+
+  public void setTokenize(String columnName, boolean tokenize) {
+    getColumn(columnName).setBoolean(HBASE_COLUMN_TOKENIZE, tokenize);
+  }
+
+  public float getBoost(String columnName) {
+    return getColumn(columnName).getFloat(HBASE_COLUMN_BOOST, 1.0f);
+  }
+
+  public void setBoost(String columnName, float boost) {
+    getColumn(columnName).setFloat(HBASE_COLUMN_BOOST, boost);
+  }
+
+  public boolean isOmitNorms(String columnName) {
+    return getColumn(columnName).getBoolean(HBASE_COLUMN_OMIT_NORMS, true);
+  }
+
+  public void setOmitNorms(String columnName, boolean omitNorms) {
+    getColumn(columnName).setBoolean(HBASE_COLUMN_OMIT_NORMS, omitNorms);
+  }
+
+  private ColumnConf getColumn(String columnName) {
+    ColumnConf column = columnMap.get(columnName);
+    if (column == null) {
+      column = new ColumnConf();
+      columnMap.put(columnName, column);
+    }
+    return column;
+  }
+
+  public String getAnalyzerName() {
+    return get(HBASE_INDEX_ANALYZER_NAME,
+        "org.apache.lucene.analysis.standard.StandardAnalyzer");
+  }
+
+  public void setAnalyzerName(String analyzerName) {
+    set(HBASE_INDEX_ANALYZER_NAME, analyzerName);
+  }
+
+  public int getMaxBufferedDeleteTerms() {
+    return getInt(HBASE_INDEX_MAX_BUFFERED_DELS, 1000);
+  }
+
+  public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
+    setInt(HBASE_INDEX_MAX_BUFFERED_DELS, maxBufferedDeleteTerms);
+  }
+
+  public int getMaxBufferedDocs() {
+    return getInt(HBASE_INDEX_MAX_BUFFERED_DOCS, 10);
+  }
+
+  public void setMaxBufferedDocs(int maxBufferedDocs) {
+    setInt(HBASE_INDEX_MAX_BUFFERED_DOCS, maxBufferedDocs);
+  }
+
+  public int getMaxFieldLength() {
+    return getInt(HBASE_INDEX_MAX_FIELD_LENGTH, Integer.MAX_VALUE);
+  }
+
+  public void setMaxFieldLength(int maxFieldLength) {
+    setInt(HBASE_INDEX_MAX_FIELD_LENGTH, maxFieldLength);
+  }
+
+  public int getMaxMergeDocs() {
+    return getInt(HBASE_INDEX_MAX_MERGE_DOCS, Integer.MAX_VALUE);
+  }
+
+  public void setMaxMergeDocs(int maxMergeDocs) {
+    setInt(HBASE_INDEX_MAX_MERGE_DOCS, maxMergeDocs);
+  }
+
+  public int getMergeFactor() {
+    return getInt(HBASE_INDEX_MERGE_FACTOR, 10);
+  }
+
+  public void setMergeFactor(int mergeFactor) {
+    setInt(HBASE_INDEX_MERGE_FACTOR, mergeFactor);
+  }
+
+  public String getRowkeyName() {
+    return get(HBASE_INDEX_ROWKEY_NAME, "ROWKEY");
+  }
+
+  public void setRowkeyName(String rowkeyName) {
+    set(HBASE_INDEX_ROWKEY_NAME, rowkeyName);
+  }
+
+  public String getSimilarityName() {
+    return get(HBASE_INDEX_SIMILARITY_NAME, null);
+  }
+
+  public void setSimilarityName(String similarityName) {
+    set(HBASE_INDEX_SIMILARITY_NAME, similarityName);
+  }
+
+  public boolean isUseCompoundFile() {
+    return getBoolean(HBASE_INDEX_USE_COMPOUND_FILE, false);
+  }
+
+  public void setUseCompoundFile(boolean useCompoundFile) {
+    setBoolean(HBASE_INDEX_USE_COMPOUND_FILE, useCompoundFile);
+  }
+
+  public boolean doOptimize() {
+    return getBoolean(HBASE_INDEX_OPTIMIZE, true);
+  }
+
+  public void setDoOptimize(boolean doOptimize) {
+    setBoolean(HBASE_INDEX_OPTIMIZE, doOptimize);
+  }
+
+  public void addFromXML(String content) {
+    try {
+      DocumentBuilder builder = DocumentBuilderFactory.newInstance()
+          .newDocumentBuilder();
+
+      Document doc = builder
+          .parse(new ByteArrayInputStream(content.getBytes()));
+
+      Element root = doc.getDocumentElement();
+      if (!"configuration".equals(root.getTagName())) {
+        LOG.fatal("bad conf file: top-level element not <configuration>");
+      }
+
+      NodeList props = root.getChildNodes();
+      for (int i = 0; i < props.getLength(); i++) {
+        Node propNode = props.item(i);
+        if (!(propNode instanceof Element)) {
+          continue;
+        }
+
+        Element prop = (Element) propNode;
+        if ("property".equals(prop.getTagName())) {
+          propertyFromXML(prop, null);
+        } else if ("column".equals(prop.getTagName())) {
+          columnConfFromXML(prop);
+        } else {
+          LOG.warn("bad conf content: element neither <property> nor <column>");
+        }
+      }
+    } catch (Exception e) {
+      LOG.fatal("error parsing conf content: " + e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void propertyFromXML(Element prop, Properties properties) {
+    NodeList fields = prop.getChildNodes();
+    String attr = null;
+    String value = null;
+
+    for (int j = 0; j < fields.getLength(); j++) {
+      Node fieldNode = fields.item(j);
+      if (!(fieldNode instanceof Element)) {
+        continue;
+      }
+
+      Element field = (Element) fieldNode;
+      if ("name".equals(field.getTagName())) {
+        attr = ((Text) field.getFirstChild()).getData();
+      }
+      if ("value".equals(field.getTagName()) && field.hasChildNodes()) {
+        value = ((Text) field.getFirstChild()).getData();
+      }
+    }
+
+    if (attr != null && value != null) {
+      if (properties == null) {
+        set(attr, value);
+      } else {
+        properties.setProperty(attr, value);
+      }
+    }
+  }
+
+  private void columnConfFromXML(Element column) {
+    ColumnConf columnConf = new ColumnConf();
+    NodeList props = column.getChildNodes();
+    for (int i = 0; i < props.getLength(); i++) {
+      Node propNode = props.item(i);
+      if (!(propNode instanceof Element)) {
+        continue;
+      }
+
+      Element prop = (Element) propNode;
+      if ("property".equals(prop.getTagName())) {
+        propertyFromXML(prop, columnConf);
+      } else {
+        LOG.warn("bad conf content: element not <property>");
+      }
+    }
+
+    if (columnConf.getProperty(HBASE_COLUMN_NAME) != null) {
+      columnMap.put(columnConf.getProperty(HBASE_COLUMN_NAME), columnConf);
+    } else {
+      LOG.warn("bad column conf: name not specified");
+    }
+  }
+
+  public void write(OutputStream out) throws IOException {
+    try {
+      Document doc = writeDocument();
+      DOMSource source = new DOMSource(doc);
+      StreamResult result = new StreamResult(out);
+      TransformerFactory transFactory = TransformerFactory.newInstance();
+      Transformer transformer = transFactory.newTransformer();
+      transformer.transform(source, result);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Document writeDocument() {
+    Iterator<Map.Entry<String, String>> iter = iterator();
+    try {
+      Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder()
+          .newDocument();
+      Element conf = doc.createElement("configuration");
+      doc.appendChild(conf);
+      conf.appendChild(doc.createTextNode("\n"));
+
+      Map.Entry<String, String> entry;
+      while (iter.hasNext()) {
+        entry = iter.next();
+        String name = entry.getKey();
+        String value = entry.getValue();
+        writeProperty(doc, conf, name, value);
+      }
+
+      Iterator<String> columnIter = columnNameIterator();
+      while (columnIter.hasNext()) {
+        writeColumn(doc, conf, columnIter.next());
+      }
+
+      return doc;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void writeProperty(Document doc, Element parent, String name,
+      String value) {
+    Element propNode = doc.createElement("property");
+    parent.appendChild(propNode);
+
+    Element nameNode = doc.createElement("name");
+    nameNode.appendChild(doc.createTextNode(name));
+    propNode.appendChild(nameNode);
+
+    Element valueNode = doc.createElement("value");
+    valueNode.appendChild(doc.createTextNode(value));
+    propNode.appendChild(valueNode);
+
+    parent.appendChild(doc.createTextNode("\n"));
+  }
+
+  private void writeColumn(Document doc, Element parent, String columnName) {
+    Element column = doc.createElement("column");
+    parent.appendChild(column);
+    column.appendChild(doc.createTextNode("\n"));
+
+    ColumnConf columnConf = getColumn(columnName);
+    for (Map.Entry<Object, Object> entry : columnConf.entrySet()) {
+      if (entry.getKey() instanceof String
+          && entry.getValue() instanceof String) {
+        writeProperty(doc, column, (String) entry.getKey(), (String) entry
+            .getValue());
+      }
+    }
+  }
+
+  public String toString() {
+    StringWriter writer = new StringWriter();
+    try {
+      Document doc = writeDocument();
+      DOMSource source = new DOMSource(doc);
+      StreamResult result = new StreamResult(writer);
+      TransformerFactory transFactory = TransformerFactory.newInstance();
+      Transformer transformer = transFactory.newTransformer();
+      transformer.transform(source, result);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return writer.toString();
+  }
+}
\ No newline at end of file

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java?rev=580399&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java Fri Sep 28 09:12:24 2007
@@ -0,0 +1,163 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormatBase;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.search.Similarity;
+
+/**
+ * Create a local index, unwrap Lucene documents created by reduce, add them to
+ * the index, and copy the index to the destination.
+ */
+public class IndexOutputFormat extends
+    OutputFormatBase<Text, LuceneDocumentWrapper> {
+  static final Log LOG = LogFactory.getLog(IndexOutputFormat.class);
+
+  @Override
+  public RecordWriter<Text, LuceneDocumentWrapper> getRecordWriter(
+    final FileSystem fs, JobConf job, String name, final Progressable progress)
+  throws IOException {
+
+    final Path perm = new Path(job.getOutputPath(), name);
+    final Path temp = job.getLocalPath("index/_"
+        + Integer.toString(new Random().nextInt()));
+
+    LOG.info("To index into " + perm);
+
+    // delete old, if any
+    fs.delete(perm);
+
+    final IndexConfiguration indexConf = new IndexConfiguration();
+    String content = job.get("hbase.index.conf");
+    if (content != null) {
+      indexConf.addFromXML(content);
+    }
+
+    String analyzerName = indexConf.getAnalyzerName();
+    Analyzer analyzer;
+    try {
+      Class analyzerClass = Class.forName(analyzerName);
+      analyzer = (Analyzer) analyzerClass.newInstance();
+    } catch (Exception e) {
+      throw new IOException("Error in creating an analyzer object "
+          + analyzerName);
+    }
+
+    // build locally first
+    final IndexWriter writer = new IndexWriter(fs.startLocalOutput(perm, temp)
+        .toString(), analyzer, true);
+
+    // no delete, so no need for maxBufferedDeleteTerms
+    writer.setMaxBufferedDocs(indexConf.getMaxBufferedDocs());
+    writer.setMaxFieldLength(indexConf.getMaxFieldLength());
+    writer.setMaxMergeDocs(indexConf.getMaxMergeDocs());
+    writer.setMergeFactor(indexConf.getMergeFactor());
+    String similarityName = indexConf.getSimilarityName();
+    if (similarityName != null) {
+      try {
+        Class similarityClass = Class.forName(similarityName);
+        Similarity similarity = (Similarity) similarityClass.newInstance();
+        writer.setSimilarity(similarity);
+      } catch (Exception e) {
+        throw new IOException("Error in creating a similarty object "
+            + similarityName);
+      }
+    }
+    writer.setUseCompoundFile(indexConf.isUseCompoundFile());
+
+    return new RecordWriter<Text, LuceneDocumentWrapper>() {
+      private boolean closed;
+      private long docCount = 0;
+
+      public void write(@SuppressWarnings("unused") Text key,
+        LuceneDocumentWrapper value)
+      throws IOException {
+        // unwrap and index doc
+        Document doc = value.get();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(" Indexing [" + doc + "]");
+        }
+
+        writer.addDocument(doc);
+        docCount++;
+        progress.progress();
+      }
+
+      public void close(final Reporter reporter) throws IOException {
+        // spawn a thread to give progress heartbeats
+        Thread prog = new Thread() {
+          public void run() {
+            while (!closed) {
+              try {
+                reporter.setStatus("closing");
+                Thread.sleep(1000);
+              } catch (InterruptedException e) {
+                continue;
+              } catch (Throwable e) {
+                return;
+              }
+            }
+          }
+        };
+
+        try {
+          prog.start();
+
+          // optimize index
+          if (indexConf.doOptimize()) {
+            if (LOG.isInfoEnabled()) {
+              LOG.info("Optimizing index.");
+            }
+            writer.optimize();
+          }
+
+          // close index
+          writer.close();
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Done indexing " + docCount + " docs.");
+          }
+
+          // copy to perm destination in dfs
+          fs.completeLocalOutput(perm, temp);
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Copy done.");
+          }
+        } finally {
+          closed = true;
+        }
+      }
+    };
+  }
+}
\ No newline at end of file

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java?rev=580399&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java Fri Sep 28 09:12:24 2007
@@ -0,0 +1,107 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+
+/**
+ * Construct a Lucene document per row, which is consumed by IndexOutputFormat
+ * to build a Lucene index
+ */
+public class IndexTableReduce extends MapReduceBase implements
+    Reducer<Text, MapWritable, Text, LuceneDocumentWrapper> {
+  private static final Logger LOG = Logger.getLogger(IndexTableReduce.class);
+
+  private IndexConfiguration indexConf;
+
+  public void configure(JobConf job) {
+    super.configure(job);
+    indexConf = new IndexConfiguration();
+    String content = job.get("hbase.index.conf");
+    if (content != null) {
+      indexConf.addFromXML(content);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Index conf: " + indexConf);
+    }
+  }
+
+  public void close() throws IOException {
+    super.close();
+  }
+
+  public void reduce(Text key, Iterator<MapWritable> values,
+      OutputCollector<Text, LuceneDocumentWrapper> output, Reporter reporter)
+      throws IOException {
+    if (!values.hasNext()) {
+      return;
+    }
+
+    Document doc = new Document();
+
+    // index and store row key, row key already UTF-8 encoded
+    Field keyField = new Field(indexConf.getRowkeyName(), key.toString(),
+      Field.Store.YES, Field.Index.UN_TOKENIZED);
+    keyField.setOmitNorms(true);
+    doc.add(keyField);
+
+    while (values.hasNext()) {
+      MapWritable value = values.next();
+
+      // each column (name-value pair) is a field (name-value pair)
+      for (Map.Entry<Writable, Writable> entry : value.entrySet()) {
+        // name is already UTF-8 encoded
+        String column = ((Text) entry.getKey()).toString();
+        byte[] columnValue = ((ImmutableBytesWritable)entry.getValue()).get();
+        Field.Store store = indexConf.isStore(column)?
+          Field.Store.YES: Field.Store.NO;
+        Field.Index index = indexConf.isIndex(column)?
+          (indexConf.isTokenize(column)?
+            Field.Index.TOKENIZED: Field.Index.UN_TOKENIZED):
+            Field.Index.NO;
+
+        // UTF-8 encode value
+        Field field = new Field(column, new String(columnValue,
+          HConstants.UTF8_ENCODING), store, index);
+        field.setBoost(indexConf.getBoost(column));
+        field.setOmitNorms(indexConf.isOmitNorms(column));
+
+        doc.add(field);
+      }
+    }
+    output.collect(key, new LuceneDocumentWrapper(doc));
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java?rev=580399&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java Fri Sep 28 09:12:24 2007
@@ -0,0 +1,51 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.lucene.document.Document;
+
+/**
+ * A utility class used to pass a lucene document from reduce to OutputFormat.
+ * It doesn't really serialize/deserialize a lucene document.
+ */
+class LuceneDocumentWrapper implements Writable {
+  private Document doc;
+
+  public LuceneDocumentWrapper(Document doc) {
+    this.doc = doc;
+  }
+
+  public Document get() {
+    return doc;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    // intentionally left blank
+  }
+
+  public void write(DataOutput out) throws IOException {
+    // intentionally left blank
+  }
+}
\ No newline at end of file

Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java?rev=580399&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java Fri Sep 28 09:12:24 2007
@@ -0,0 +1,297 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import junit.framework.TestSuite;
+import junit.textui.TestRunner;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseAdmin;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HScannerInterface;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.HTable;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.MultiRegionTable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MultiSearcher;
+import org.apache.lucene.search.Searchable;
+import org.apache.lucene.search.Searcher;
+import org.apache.lucene.search.TermQuery;
+
+/**
+ * Test Map/Reduce job to build index over HBase table
+ */
+public class TestTableIndex extends HBaseTestCase {
+  private static final Log LOG = LogFactory.getLog(TestTableIndex.class);
+
+  static final String TABLE_NAME = "moretest";
+  static final String INPUT_COLUMN = "contents:";
+  static final Text TEXT_INPUT_COLUMN = new Text(INPUT_COLUMN);
+  static final String OUTPUT_COLUMN = "text:";
+  static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN);
+  static final String ROWKEY_NAME = "key";
+  static final String INDEX_DIR = "testindex";
+
+  private HTableDescriptor desc;
+
+  private MiniDFSCluster dfsCluster = null;
+  private FileSystem fs;
+  private Path dir;
+  private MiniHBaseCluster hCluster = null;
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+
+    // This size should make it so we always split using the addContent
+    // below. After adding all data, the first region is 1.3M
+    conf.setLong("hbase.hregion.max.filesize", 256 * 1024);
+
+    desc = new HTableDescriptor(TABLE_NAME);
+    desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
+    desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
+
+    dfsCluster = new MiniDFSCluster(conf, 1, true, (String[]) null);
+    try {
+      fs = dfsCluster.getFileSystem();
+
+      dir = new Path("/hbase");
+      fs.mkdirs(dir);
+
+      // Start up HBase cluster
+      hCluster = new MiniHBaseCluster(conf, 1, dfsCluster);
+
+      // Create a table.
+      HBaseAdmin admin = new HBaseAdmin(conf);
+      admin.createTable(desc);
+
+      // Populate a table into multiple regions
+      MultiRegionTable.makeMultiRegionTable(conf, hCluster, null, TABLE_NAME,
+        INPUT_COLUMN);
+
+      // Verify table indeed has multiple regions
+      HTable table = new HTable(conf, new Text(TABLE_NAME));
+      Text[] startKeys = table.getStartKeys();
+      assertTrue(startKeys.length > 1);
+    } catch (Exception e) {
+      if (dfsCluster != null) {
+        dfsCluster.shutdown();
+        dfsCluster = null;
+      }
+      throw e;
+    }
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+
+    if (hCluster != null) {
+      hCluster.shutdown();
+    }
+
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
+  }
+
+  /**
+   * Test HBase map/reduce
+   * 
+   * @throws IOException
+   */
+  @SuppressWarnings("static-access")
+  public void testTableIndex() throws IOException {
+    long firstK = 32;
+    LOG.info("Print table contents before map/reduce");
+    scanTable(conf, firstK);
+
+    @SuppressWarnings("deprecation")
+    MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
+
+    // set configuration parameter for index build
+    conf.set("hbase.index.conf", createIndexConfContent());
+
+    try {
+      JobConf jobConf = new JobConf(conf, TestTableIndex.class);
+      jobConf.setJobName("index column contents");
+      jobConf.setNumMapTasks(2);
+      // number of indexes to partition into
+      jobConf.setNumReduceTasks(1);
+
+      // use identity map (a waste, but just as an example)
+      IdentityTableMap.initJob(TABLE_NAME, INPUT_COLUMN,
+          IdentityTableMap.class, jobConf);
+
+      // use IndexTableReduce to build a Lucene index
+      jobConf.setReducerClass(IndexTableReduce.class);
+      jobConf.setOutputPath(new Path(INDEX_DIR));
+      jobConf.setOutputFormat(IndexOutputFormat.class);
+
+      JobClient.runJob(jobConf);
+
+    } finally {
+      mrCluster.shutdown();
+    }
+
+    LOG.info("Print table contents after map/reduce");
+    scanTable(conf, firstK);
+
+    // verify index results
+    verify(conf);
+  }
+
+  private String createIndexConfContent() {
+    StringBuffer buffer = new StringBuffer();
+    buffer.append("<configuration><column><property>" +
+      "<name>hbase.column.name</name><value>" + INPUT_COLUMN +
+      "</value></property>");
+    buffer.append("<property><name>hbase.column.store</name> " +
+      "<value>true</value></property>");
+    buffer.append("<property><name>hbase.column.index</name>" +
+      "<value>true</value></property>");
+    buffer.append("<property><name>hbase.column.tokenize</name>" +
+      "<value>false</value></property>");
+    buffer.append("<property><name>hbase.column.boost</name>" +
+      "<value>3</value></property>");
+    buffer.append("<property><name>hbase.column.omit.norms</name>" +
+      "<value>false</value></property></column>");
+    buffer.append("<property><name>hbase.index.rowkey.name</name><value>" +
+      ROWKEY_NAME + "</value></property>");
+    buffer.append("<property><name>hbase.index.max.buffered.docs</name>" +
+      "<value>500</value></property>");
+    buffer.append("<property><name>hbase.index.max.field.length</name>" +
+      "<value>10000</value></property>");
+    buffer.append("<property><name>hbase.index.merge.factor</name>" +
+      "<value>10</value></property>");
+    buffer.append("<property><name>hbase.index.use.compound.file</name>" +
+      "<value>true</value></property>");
+    buffer.append("<property><name>hbase.index.optimize</name>" +
+      "<value>true</value></property></configuration>");
+
+    IndexConfiguration c = new IndexConfiguration();
+    c.addFromXML(buffer.toString());
+    return c.toString();
+  }
+
+  private void scanTable(Configuration c, long firstK) throws IOException {
+    HTable table = new HTable(c, new Text(TABLE_NAME));
+    Text[] columns = { TEXT_INPUT_COLUMN, TEXT_OUTPUT_COLUMN };
+    HScannerInterface scanner = table.obtainScanner(columns,
+        HConstants.EMPTY_START_ROW);
+    long count = 0;
+    try {
+      HStoreKey key = new HStoreKey();
+      TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+      while (scanner.next(key, results)) {
+        if (count < firstK)
+          LOG.info("row: " + key.getRow());
+        for (Map.Entry<Text, byte[]> e : results.entrySet()) {
+          if (count < firstK)
+            LOG.info(" column: " + e.getKey() + " value: "
+                + new String(e.getValue(), HConstants.UTF8_ENCODING));
+        }
+        count++;
+      }
+    } finally {
+      scanner.close();
+    }
+  }
+
+  private void verify(Configuration c) throws IOException {
+    Path localDir = new Path(this.testDir, "index_" +
+      Integer.toString(new Random().nextInt()));
+    this.fs.copyToLocalFile(new Path(INDEX_DIR), localDir);
+    Path [] indexDirs = this.localFs.listPaths(new Path [] {localDir});
+    Searcher searcher = null;
+    HScannerInterface scanner = null;
+    try {
+      if (indexDirs.length == 1) {
+        searcher = new IndexSearcher((new File(indexDirs[0].
+          toUri())).getAbsolutePath());
+      } else if (indexDirs.length > 1) {
+        Searchable[] searchers = new Searchable[indexDirs.length];
+        for (int i = 0; i < indexDirs.length; i++) {
+          searchers[i] = new IndexSearcher((new File(indexDirs[i].
+            toUri()).getAbsolutePath()));
+        }
+        searcher = new MultiSearcher(searchers);
+      } else {
+        throw new IOException("no index directory found");
+      }
+
+      HTable table = new HTable(c, new Text(TABLE_NAME));
+      Text[] columns = { TEXT_INPUT_COLUMN, TEXT_OUTPUT_COLUMN };
+      scanner = table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
+
+      HStoreKey key = new HStoreKey();
+      TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+
+      IndexConfiguration indexConf = new IndexConfiguration();
+      String content = c.get("hbase.index.conf");
+      if (content != null) {
+        indexConf.addFromXML(content);
+      }
+      String rowkeyName = indexConf.getRowkeyName();
+
+      int count = 0;
+      while (scanner.next(key, results)) {
+        String value = key.getRow().toString();
+        Term term = new Term(rowkeyName, value);
+        int hitCount = searcher.search(new TermQuery(term)).length();
+        assertEquals("check row " + value, 1, hitCount);
+        count++;
+      }
+      int maxDoc = searcher.maxDoc();
+      assertEquals("check number of rows", count, maxDoc);
+    } finally {
+      if (null != searcher)
+        searcher.close();
+      if (null != scanner)
+        scanner.close();
+    }
+  }
+  /**
+   * @param args unused
+   */
+  public static void main(@SuppressWarnings("unused") String[] args) {
+    TestRunner.run(new TestSuite(TestTableIndex.class));
+  }
+}
\ No newline at end of file

Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java?rev=580399&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java Fri Sep 28 09:12:24 2007
@@ -0,0 +1,387 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.hbase.HBaseAdmin;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HScannerInterface;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.HTable;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.MultiRegionTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableMap;
+import org.apache.hadoop.hbase.mapred.TableOutputCollector;
+import org.apache.hadoop.hbase.mapred.TableReduce;
+import org.apache.hadoop.hbase.mapred.IdentityTableReduce;
+
+/**
+ * Test Map/Reduce job over HBase tables
+ */
+public class TestTableMapReduce extends MultiRegionTable {
+  @SuppressWarnings("hiding")
+  private static final Log LOG =
+    LogFactory.getLog(TestTableMapReduce.class.getName());
+  
+  static final String SINGLE_REGION_TABLE_NAME = "srtest";
+  static final String MULTI_REGION_TABLE_NAME = "mrtest";
+  static final String INPUT_COLUMN = "contents:";
+  static final Text TEXT_INPUT_COLUMN = new Text(INPUT_COLUMN);
+  static final String OUTPUT_COLUMN = "text:";
+  static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN);
+  
+  private MiniDFSCluster dfsCluster = null;
+  private FileSystem fs;
+  private Path dir;
+  private MiniHBaseCluster hCluster = null;
+  
+  private static byte[][] values = null;
+  
+  static {
+    try {
+      values = new byte[][] {
+          "0123".getBytes(HConstants.UTF8_ENCODING),
+          "abcd".getBytes(HConstants.UTF8_ENCODING),
+          "wxyz".getBytes(HConstants.UTF8_ENCODING),
+          "6789".getBytes(HConstants.UTF8_ENCODING)
+      };
+    } catch (UnsupportedEncodingException e) {
+      fail();
+    }
+  }
+  
+  /** constructor */
+  public TestTableMapReduce() {
+    super();
+
+    // Make lease timeout longer, lease checks less frequent
+    conf.setInt("hbase.master.lease.period", 10 * 1000);
+    conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    // This size is picked so the table is split into two
+    // after addContent in testMultiRegionTableMapReduce.
+    conf.setLong("hbase.hregion.max.filesize", 256 * 1024);
+    dfsCluster = new MiniDFSCluster(conf, 1, true, (String[])null);
+    try {
+      fs = dfsCluster.getFileSystem();
+      dir = new Path("/hbase");
+      fs.mkdirs(dir);
+      // Start up HBase cluster
+      hCluster = new MiniHBaseCluster(conf, 1, dfsCluster);
+      LOG.info("Master is at " + this.conf.get(HConstants.MASTER_ADDRESS));
+    } catch (Exception e) {
+      if (dfsCluster != null) {
+        dfsCluster.shutdown();
+        dfsCluster = null;
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+    if(hCluster != null) {
+      hCluster.shutdown();
+    }
+    
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
+    
+    if (fs != null) {
+      try {
+        fs.close();
+      } catch (IOException e) {
+        LOG.info("During tear down got a " + e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * Pass the given key and processed record reduce
+   */
+  public static class ProcessContentsMapper extends TableMap {
+
+    /** constructor */
+    public ProcessContentsMapper() {
+      super();
+    }
+
+    /**
+     * Pass the key, and reversed value to reduce
+     *
+     * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
+     */
+    @SuppressWarnings("unchecked")
+    @Override
+    public void map(HStoreKey key, MapWritable value,
+        TableOutputCollector output,
+        @SuppressWarnings("unused") Reporter reporter) throws IOException {
+      
+      Text tKey = key.getRow();
+      
+      if(value.size() != 1) {
+        throw new IOException("There should only be one input column");
+      }
+
+      Text[] keys = value.keySet().toArray(new Text[value.size()]);
+      if(!keys[0].equals(TEXT_INPUT_COLUMN)) {
+        throw new IOException("Wrong input column. Expected: " + INPUT_COLUMN
+            + " but got: " + keys[0]);
+      }
+
+      // Get the original value and reverse it
+      
+      String originalValue =
+        new String(((ImmutableBytesWritable)value.get(keys[0])).get(),
+            HConstants.UTF8_ENCODING);
+      StringBuilder newValue = new StringBuilder();
+      for(int i = originalValue.length() - 1; i >= 0; i--) {
+        newValue.append(originalValue.charAt(i));
+      }
+      
+      // Now set the value to be collected
+
+      MapWritable outval = new MapWritable();
+      outval.put(TEXT_OUTPUT_COLUMN, new ImmutableBytesWritable(
+          newValue.toString().getBytes(HConstants.UTF8_ENCODING)));
+      
+      output.collect(tKey, outval);
+    }
+  }
+  
+  /**
+   * Test hbase mapreduce jobs against single region and multi-region tables.
+   * @throws IOException
+   */
+  public void testTableMapReduce() throws IOException {
+    localTestSingleRegionTable();
+    localTestMultiRegionTable();
+  }
+
+  /*
+   * Test against a single region.
+   * @throws IOException
+   */
+  private void localTestSingleRegionTable() throws IOException {
+    HTableDescriptor desc = new HTableDescriptor(SINGLE_REGION_TABLE_NAME);
+    desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
+    desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
+    
+    // Create a table.
+    HBaseAdmin admin = new HBaseAdmin(this.conf);
+    admin.createTable(desc);
+
+    // insert some data into the test table
+    HTable table = new HTable(conf, new Text(SINGLE_REGION_TABLE_NAME));
+
+    for(int i = 0; i < values.length; i++) {
+      long lockid = table.startUpdate(new Text("row_"
+          + String.format("%1$05d", i)));
+
+      try {
+        table.put(lockid, TEXT_INPUT_COLUMN, values[i]);
+        table.commit(lockid, System.currentTimeMillis());
+        lockid = -1;
+      } finally {
+        if (lockid != -1)
+          table.abort(lockid);
+      }
+    }
+
+    LOG.info("Print table contents before map/reduce");
+    scanTable(conf, SINGLE_REGION_TABLE_NAME);
+    
+    @SuppressWarnings("deprecation")
+    MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
+
+    try {
+      JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
+      jobConf.setJobName("process column contents");
+      jobConf.setNumMapTasks(1);
+      jobConf.setNumReduceTasks(1);
+
+      TableMap.initJob(SINGLE_REGION_TABLE_NAME, INPUT_COLUMN, 
+          ProcessContentsMapper.class, jobConf);
+
+      TableReduce.initJob(SINGLE_REGION_TABLE_NAME,
+          IdentityTableReduce.class, jobConf);
+
+      JobClient.runJob(jobConf);
+      
+    } finally {
+      mrCluster.shutdown();
+    }
+    
+    LOG.info("Print table contents after map/reduce");
+    scanTable(conf, SINGLE_REGION_TABLE_NAME);
+
+    // verify map-reduce results
+    verify(conf, SINGLE_REGION_TABLE_NAME);
+  }
+  
+  /*
+   * Test against multiple regions.
+   * @throws IOException
+   */
+  private void localTestMultiRegionTable() throws IOException {
+    HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
+    desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
+    desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
+    
+    // Create a table.
+    HBaseAdmin admin = new HBaseAdmin(this.conf);
+    admin.createTable(desc);
+
+    // Populate a table into multiple regions
+    MultiRegionTable.makeMultiRegionTable(conf, hCluster, fs,
+        MULTI_REGION_TABLE_NAME, INPUT_COLUMN);
+    
+    // Verify table indeed has multiple regions
+    HTable table = new HTable(conf, new Text(MULTI_REGION_TABLE_NAME));
+    Text[] startKeys = table.getStartKeys();
+    assertTrue(startKeys.length > 1);
+
+    @SuppressWarnings("deprecation")
+    MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
+
+    try {
+      JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
+      jobConf.setJobName("process column contents");
+      jobConf.setNumMapTasks(2);
+      jobConf.setNumReduceTasks(1);
+
+      TableMap.initJob(MULTI_REGION_TABLE_NAME, INPUT_COLUMN, 
+          ProcessContentsMapper.class, jobConf);
+
+      TableReduce.initJob(MULTI_REGION_TABLE_NAME,
+          IdentityTableReduce.class, jobConf);
+
+      JobClient.runJob(jobConf);
+      
+    } finally {
+      mrCluster.shutdown();
+    }
+    
+    // verify map-reduce results
+    verify(conf, MULTI_REGION_TABLE_NAME);
+  }
+
+  private void scanTable(Configuration conf, String tableName)
+  throws IOException {
+    HTable table = new HTable(conf, new Text(tableName));
+    
+    Text[] columns = {
+        TEXT_INPUT_COLUMN,
+        TEXT_OUTPUT_COLUMN
+    };
+    HScannerInterface scanner =
+      table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
+    
+    try {
+      HStoreKey key = new HStoreKey();
+      TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+      
+      while(scanner.next(key, results)) {
+        LOG.info("row: " + key.getRow());
+        
+        for(Map.Entry<Text, byte[]> e: results.entrySet()) {
+          LOG.info(" column: " + e.getKey() + " value: "
+              + new String(e.getValue(), HConstants.UTF8_ENCODING));
+        }
+      }
+      
+    } finally {
+      scanner.close();
+    }
+  }
+
+  @SuppressWarnings("null")
+  private void verify(Configuration conf, String tableName) throws IOException {
+    HTable table = new HTable(conf, new Text(tableName));
+    
+    Text[] columns = {
+        TEXT_INPUT_COLUMN,
+        TEXT_OUTPUT_COLUMN
+    };
+    HScannerInterface scanner =
+      table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
+    
+    try {
+      HStoreKey key = new HStoreKey();
+      TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+      
+      while(scanner.next(key, results)) {
+        byte[] firstValue = null;
+        byte[] secondValue = null;
+        int count = 0;
+        
+        for(Map.Entry<Text, byte[]> e: results.entrySet()) {
+          if (count == 0)
+            firstValue = e.getValue();
+          if (count == 1)
+            secondValue = e.getValue();
+          count++;
+        }
+        
+        // verify second value is the reverse of the first
+        assertNotNull(firstValue);
+        assertNotNull(secondValue);
+        assertEquals(firstValue.length, secondValue.length);
+        for (int i=0; i<firstValue.length; i++) {
+          assertEquals(firstValue[i], secondValue[firstValue.length-i-1]);
+        }
+      }
+      
+    } finally {
+      scanner.close();
+    }
+  }
+}



Mime
View raw message