Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 51898 invoked from network); 28 Sep 2007 16:13:04 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 28 Sep 2007 16:13:04 -0000 Received: (qmail 72105 invoked by uid 500); 28 Sep 2007 16:12:54 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 72082 invoked by uid 500); 28 Sep 2007 16:12:54 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 72073 invoked by uid 99); 28 Sep 2007 16:12:54 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Sep 2007 09:12:54 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED,URIBL_RED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Sep 2007 16:15:12 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 3B7451A9832; Fri, 28 Sep 2007 09:12:26 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r580399 - in /lucene/hadoop/trunk/src/contrib/hbase/src: java/org/apache/hadoop/hbase/mapred/ test/org/apache/hadoop/hbase/ test/org/apache/hadoop/hbase/mapred/ Date: Fri, 28 Sep 2007 16:12:25 -0000 To: hadoop-commits@lucene.apache.org From: stack@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20070928161226.3B7451A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: stack Date: Fri Sep 28 09:12:24 2007 New Revision: 580399 URL: http://svn.apache.org/viewvc?rev=580399&view=rev Log: HADOOP-1913 Build a Lucene index on an HBase table Files I failed to add/delete on original commit Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java Removed: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java?rev=580399&view=auto ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java (added) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java Fri Sep 28 09:12:24 2007 @@ -0,0 +1,184 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; + +/** + * Example table column indexing class. Runs a mapreduce job to index + * specified table columns. + *
  • Each row is modeled as a Lucene document: row key is indexed in + * its untokenized form, column name-value pairs are Lucene field name-value + * pairs.
  • + *
  • A file passed on command line is used to populate an + * {@link IndexConfiguration} which is used to set various Lucene parameters, + * specify whether to optimize an index and which columns to index and/or + * store, in tokenized or untokenized form, etc. For an example, see the + * createIndexConfContent method in TestTableIndex + *
  • + *
  • The number of reduce tasks decides the number of indexes (partitions). + * The index(es) is stored in the output path of job configuration.
  • + *
  • The index build process is done in the reduce phase. Users can use + * the map phase to join rows from different tables or to pre-parse/analyze + * column content, etc.
  • + *
+ */ +public class BuildTableIndex { + private static final String USAGE = "Usage: BuildTableIndex " + + "-m -r \n -indexConf " + + "-indexDir \n -table -columns " + + "[ ...]"; + + private static void printUsage(String message) { + System.err.println(message); + System.err.println(USAGE); + System.exit(-1); + } + + public BuildTableIndex() { + super(); + } + + public void run(String[] args) throws IOException { + if (args.length < 6) { + printUsage("Too few arguments"); + } + + int numMapTasks = 1; + int numReduceTasks = 1; + String iconfFile = null; + String indexDir = null; + String tableName = null; + StringBuffer columnNames = null; + + // parse args + for (int i = 0; i < args.length - 1; i++) { + if ("-m".equals(args[i])) { + numMapTasks = Integer.parseInt(args[++i]); + } else if ("-r".equals(args[i])) { + numReduceTasks = Integer.parseInt(args[++i]); + } else if ("-indexConf".equals(args[i])) { + iconfFile = args[++i]; + } else if ("-indexDir".equals(args[i])) { + indexDir = args[++i]; + } else if ("-table".equals(args[i])) { + tableName = args[++i]; + } else if ("-columns".equals(args[i])) { + columnNames = new StringBuffer(args[++i]); + while (i + 1 < args.length && !args[i + 1].startsWith("-")) { + columnNames.append(" "); + columnNames.append(args[++i]); + } + } else { + printUsage("Unsupported option " + args[i]); + } + } + + if (indexDir == null || tableName == null || columnNames == null) { + printUsage("Index directory, table name and at least one column must " + + "be specified"); + } + + Configuration conf = new HBaseConfiguration(); + if (iconfFile != null) { + // set index configuration content from a file + String content = readContent(iconfFile); + IndexConfiguration iconf = new IndexConfiguration(); + // purely to validate, exception will be thrown if not valid + iconf.addFromXML(content); + conf.set("hbase.index.conf", content); + } + + JobConf jobConf = createJob(conf, numMapTasks, numReduceTasks, indexDir, + tableName, columnNames.toString()); + JobClient.runJob(jobConf); + } + + public JobConf createJob(Configuration conf, int numMapTasks, + int numReduceTasks, String indexDir, String tableName, + String columnNames) { + JobConf jobConf = new JobConf(conf, BuildTableIndex.class); + jobConf.setJobName("build index for table " + tableName); + jobConf.setNumMapTasks(numMapTasks); + // number of indexes to partition into + jobConf.setNumReduceTasks(numReduceTasks); + + // use identity map (a waste, but just as an example) + IdentityTableMap.initJob(tableName, columnNames, IdentityTableMap.class, + jobConf); + + // use IndexTableReduce to build a Lucene index + jobConf.setReducerClass(IndexTableReduce.class); + jobConf.setOutputPath(new Path(indexDir)); + jobConf.setOutputFormat(IndexOutputFormat.class); + + return jobConf; + } + + /* + * Read xml file of indexing configurations. The xml format is similar to + * hbase-default.xml and hadoop-default.xml. For an example configuration, + * see the createIndexConfContent method in TestTableIndex + * @param fileName File to read. + * @return XML configuration read from file + * @throws IOException + */ + private String readContent(String fileName) throws IOException { + File file = new File(fileName); + int length = (int) file.length(); + if (length == 0) { + printUsage("Index configuration file " + fileName + " does not exist"); + } + + int bytesRead = 0; + byte[] bytes = new byte[length]; + FileInputStream fis = new FileInputStream(file); + + try { + // read entire file into content + while (bytesRead < length) { + int read = fis.read(bytes, bytesRead, length - bytesRead); + if (read > 0) { + bytesRead += read; + } else { + break; + } + } + } finally { + fis.close(); + } + + return new String(bytes, 0, bytesRead, HConstants.UTF8_ENCODING); + } + + public static void main(String[] args) throws IOException { + BuildTableIndex build = new BuildTableIndex(); + build.run(args); + } +} \ No newline at end of file Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java?rev=580399&view=auto ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java (added) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java Fri Sep 28 09:12:24 2007 @@ -0,0 +1,418 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.StringWriter; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.w3c.dom.Text; + +/** + * Configuration parameters for building a Lucene index + */ +public class IndexConfiguration extends Configuration { + private static final Log LOG = LogFactory.getLog(IndexConfiguration.class); + + static final String HBASE_COLUMN_NAME = "hbase.column.name"; + static final String HBASE_COLUMN_STORE = "hbase.column.store"; + static final String HBASE_COLUMN_INDEX = "hbase.column.index"; + static final String HBASE_COLUMN_TOKENIZE = "hbase.column.tokenize"; + static final String HBASE_COLUMN_BOOST = "hbase.column.boost"; + static final String HBASE_COLUMN_OMIT_NORMS = "hbase.column.omit.norms"; + static final String HBASE_INDEX_ROWKEY_NAME = "hbase.index.rowkey.name"; + static final String HBASE_INDEX_ANALYZER_NAME = "hbase.index.analyzer.name"; + static final String HBASE_INDEX_MAX_BUFFERED_DOCS = + "hbase.index.max.buffered.docs"; + static final String HBASE_INDEX_MAX_BUFFERED_DELS = + "hbase.index.max.buffered.dels"; + static final String HBASE_INDEX_MAX_FIELD_LENGTH = + "hbase.index.max.field.length"; + static final String HBASE_INDEX_MAX_MERGE_DOCS = + "hbase.index.max.merge.docs"; + static final String HBASE_INDEX_MERGE_FACTOR = "hbase.index.merge.factor"; + // double ramBufferSizeMB; + static final String HBASE_INDEX_SIMILARITY_NAME = + "hbase.index.similarity.name"; + static final String HBASE_INDEX_USE_COMPOUND_FILE = + "hbase.index.use.compound.file"; + static final String HBASE_INDEX_OPTIMIZE = "hbase.index.optimize"; + + public static class ColumnConf extends Properties { + boolean getBoolean(String name, boolean defaultValue) { + String valueString = getProperty(name); + if ("true".equals(valueString)) + return true; + else if ("false".equals(valueString)) + return false; + else + return defaultValue; + } + + void setBoolean(String name, boolean value) { + setProperty(name, Boolean.toString(value)); + } + + float getFloat(String name, float defaultValue) { + String valueString = getProperty(name); + if (valueString == null) + return defaultValue; + try { + return Float.parseFloat(valueString); + } catch (NumberFormatException e) { + return defaultValue; + } + } + + void setFloat(String name, float value) { + setProperty(name, Float.toString(value)); + } + } + + private HashMap columnMap = new HashMap(); + + public Iterator columnNameIterator() { + return columnMap.keySet().iterator(); + } + + public boolean isIndex(String columnName) { + return getColumn(columnName).getBoolean(HBASE_COLUMN_INDEX, true); + } + + public void setIndex(String columnName, boolean index) { + getColumn(columnName).setBoolean(HBASE_COLUMN_INDEX, index); + } + + public boolean isStore(String columnName) { + return getColumn(columnName).getBoolean(HBASE_COLUMN_STORE, false); + } + + public void setStore(String columnName, boolean store) { + getColumn(columnName).setBoolean(HBASE_COLUMN_STORE, store); + } + + public boolean isTokenize(String columnName) { + return getColumn(columnName).getBoolean(HBASE_COLUMN_TOKENIZE, true); + } + + public void setTokenize(String columnName, boolean tokenize) { + getColumn(columnName).setBoolean(HBASE_COLUMN_TOKENIZE, tokenize); + } + + public float getBoost(String columnName) { + return getColumn(columnName).getFloat(HBASE_COLUMN_BOOST, 1.0f); + } + + public void setBoost(String columnName, float boost) { + getColumn(columnName).setFloat(HBASE_COLUMN_BOOST, boost); + } + + public boolean isOmitNorms(String columnName) { + return getColumn(columnName).getBoolean(HBASE_COLUMN_OMIT_NORMS, true); + } + + public void setOmitNorms(String columnName, boolean omitNorms) { + getColumn(columnName).setBoolean(HBASE_COLUMN_OMIT_NORMS, omitNorms); + } + + private ColumnConf getColumn(String columnName) { + ColumnConf column = columnMap.get(columnName); + if (column == null) { + column = new ColumnConf(); + columnMap.put(columnName, column); + } + return column; + } + + public String getAnalyzerName() { + return get(HBASE_INDEX_ANALYZER_NAME, + "org.apache.lucene.analysis.standard.StandardAnalyzer"); + } + + public void setAnalyzerName(String analyzerName) { + set(HBASE_INDEX_ANALYZER_NAME, analyzerName); + } + + public int getMaxBufferedDeleteTerms() { + return getInt(HBASE_INDEX_MAX_BUFFERED_DELS, 1000); + } + + public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) { + setInt(HBASE_INDEX_MAX_BUFFERED_DELS, maxBufferedDeleteTerms); + } + + public int getMaxBufferedDocs() { + return getInt(HBASE_INDEX_MAX_BUFFERED_DOCS, 10); + } + + public void setMaxBufferedDocs(int maxBufferedDocs) { + setInt(HBASE_INDEX_MAX_BUFFERED_DOCS, maxBufferedDocs); + } + + public int getMaxFieldLength() { + return getInt(HBASE_INDEX_MAX_FIELD_LENGTH, Integer.MAX_VALUE); + } + + public void setMaxFieldLength(int maxFieldLength) { + setInt(HBASE_INDEX_MAX_FIELD_LENGTH, maxFieldLength); + } + + public int getMaxMergeDocs() { + return getInt(HBASE_INDEX_MAX_MERGE_DOCS, Integer.MAX_VALUE); + } + + public void setMaxMergeDocs(int maxMergeDocs) { + setInt(HBASE_INDEX_MAX_MERGE_DOCS, maxMergeDocs); + } + + public int getMergeFactor() { + return getInt(HBASE_INDEX_MERGE_FACTOR, 10); + } + + public void setMergeFactor(int mergeFactor) { + setInt(HBASE_INDEX_MERGE_FACTOR, mergeFactor); + } + + public String getRowkeyName() { + return get(HBASE_INDEX_ROWKEY_NAME, "ROWKEY"); + } + + public void setRowkeyName(String rowkeyName) { + set(HBASE_INDEX_ROWKEY_NAME, rowkeyName); + } + + public String getSimilarityName() { + return get(HBASE_INDEX_SIMILARITY_NAME, null); + } + + public void setSimilarityName(String similarityName) { + set(HBASE_INDEX_SIMILARITY_NAME, similarityName); + } + + public boolean isUseCompoundFile() { + return getBoolean(HBASE_INDEX_USE_COMPOUND_FILE, false); + } + + public void setUseCompoundFile(boolean useCompoundFile) { + setBoolean(HBASE_INDEX_USE_COMPOUND_FILE, useCompoundFile); + } + + public boolean doOptimize() { + return getBoolean(HBASE_INDEX_OPTIMIZE, true); + } + + public void setDoOptimize(boolean doOptimize) { + setBoolean(HBASE_INDEX_OPTIMIZE, doOptimize); + } + + public void addFromXML(String content) { + try { + DocumentBuilder builder = DocumentBuilderFactory.newInstance() + .newDocumentBuilder(); + + Document doc = builder + .parse(new ByteArrayInputStream(content.getBytes())); + + Element root = doc.getDocumentElement(); + if (!"configuration".equals(root.getTagName())) { + LOG.fatal("bad conf file: top-level element not "); + } + + NodeList props = root.getChildNodes(); + for (int i = 0; i < props.getLength(); i++) { + Node propNode = props.item(i); + if (!(propNode instanceof Element)) { + continue; + } + + Element prop = (Element) propNode; + if ("property".equals(prop.getTagName())) { + propertyFromXML(prop, null); + } else if ("column".equals(prop.getTagName())) { + columnConfFromXML(prop); + } else { + LOG.warn("bad conf content: element neither nor "); + } + } + } catch (Exception e) { + LOG.fatal("error parsing conf content: " + e); + throw new RuntimeException(e); + } + } + + private void propertyFromXML(Element prop, Properties properties) { + NodeList fields = prop.getChildNodes(); + String attr = null; + String value = null; + + for (int j = 0; j < fields.getLength(); j++) { + Node fieldNode = fields.item(j); + if (!(fieldNode instanceof Element)) { + continue; + } + + Element field = (Element) fieldNode; + if ("name".equals(field.getTagName())) { + attr = ((Text) field.getFirstChild()).getData(); + } + if ("value".equals(field.getTagName()) && field.hasChildNodes()) { + value = ((Text) field.getFirstChild()).getData(); + } + } + + if (attr != null && value != null) { + if (properties == null) { + set(attr, value); + } else { + properties.setProperty(attr, value); + } + } + } + + private void columnConfFromXML(Element column) { + ColumnConf columnConf = new ColumnConf(); + NodeList props = column.getChildNodes(); + for (int i = 0; i < props.getLength(); i++) { + Node propNode = props.item(i); + if (!(propNode instanceof Element)) { + continue; + } + + Element prop = (Element) propNode; + if ("property".equals(prop.getTagName())) { + propertyFromXML(prop, columnConf); + } else { + LOG.warn("bad conf content: element not "); + } + } + + if (columnConf.getProperty(HBASE_COLUMN_NAME) != null) { + columnMap.put(columnConf.getProperty(HBASE_COLUMN_NAME), columnConf); + } else { + LOG.warn("bad column conf: name not specified"); + } + } + + public void write(OutputStream out) throws IOException { + try { + Document doc = writeDocument(); + DOMSource source = new DOMSource(doc); + StreamResult result = new StreamResult(out); + TransformerFactory transFactory = TransformerFactory.newInstance(); + Transformer transformer = transFactory.newTransformer(); + transformer.transform(source, result); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Document writeDocument() { + Iterator> iter = iterator(); + try { + Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder() + .newDocument(); + Element conf = doc.createElement("configuration"); + doc.appendChild(conf); + conf.appendChild(doc.createTextNode("\n")); + + Map.Entry entry; + while (iter.hasNext()) { + entry = iter.next(); + String name = entry.getKey(); + String value = entry.getValue(); + writeProperty(doc, conf, name, value); + } + + Iterator columnIter = columnNameIterator(); + while (columnIter.hasNext()) { + writeColumn(doc, conf, columnIter.next()); + } + + return doc; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void writeProperty(Document doc, Element parent, String name, + String value) { + Element propNode = doc.createElement("property"); + parent.appendChild(propNode); + + Element nameNode = doc.createElement("name"); + nameNode.appendChild(doc.createTextNode(name)); + propNode.appendChild(nameNode); + + Element valueNode = doc.createElement("value"); + valueNode.appendChild(doc.createTextNode(value)); + propNode.appendChild(valueNode); + + parent.appendChild(doc.createTextNode("\n")); + } + + private void writeColumn(Document doc, Element parent, String columnName) { + Element column = doc.createElement("column"); + parent.appendChild(column); + column.appendChild(doc.createTextNode("\n")); + + ColumnConf columnConf = getColumn(columnName); + for (Map.Entry entry : columnConf.entrySet()) { + if (entry.getKey() instanceof String + && entry.getValue() instanceof String) { + writeProperty(doc, column, (String) entry.getKey(), (String) entry + .getValue()); + } + } + } + + public String toString() { + StringWriter writer = new StringWriter(); + try { + Document doc = writeDocument(); + DOMSource source = new DOMSource(doc); + StreamResult result = new StreamResult(writer); + TransformerFactory transFactory = TransformerFactory.newInstance(); + Transformer transformer = transFactory.newTransformer(); + transformer.transform(source, result); + } catch (Exception e) { + throw new RuntimeException(e); + } + return writer.toString(); + } +} \ No newline at end of file Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java?rev=580399&view=auto ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java (added) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java Fri Sep 28 09:12:24 2007 @@ -0,0 +1,163 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormatBase; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.search.Similarity; + +/** + * Create a local index, unwrap Lucene documents created by reduce, add them to + * the index, and copy the index to the destination. + */ +public class IndexOutputFormat extends + OutputFormatBase { + static final Log LOG = LogFactory.getLog(IndexOutputFormat.class); + + @Override + public RecordWriter getRecordWriter( + final FileSystem fs, JobConf job, String name, final Progressable progress) + throws IOException { + + final Path perm = new Path(job.getOutputPath(), name); + final Path temp = job.getLocalPath("index/_" + + Integer.toString(new Random().nextInt())); + + LOG.info("To index into " + perm); + + // delete old, if any + fs.delete(perm); + + final IndexConfiguration indexConf = new IndexConfiguration(); + String content = job.get("hbase.index.conf"); + if (content != null) { + indexConf.addFromXML(content); + } + + String analyzerName = indexConf.getAnalyzerName(); + Analyzer analyzer; + try { + Class analyzerClass = Class.forName(analyzerName); + analyzer = (Analyzer) analyzerClass.newInstance(); + } catch (Exception e) { + throw new IOException("Error in creating an analyzer object " + + analyzerName); + } + + // build locally first + final IndexWriter writer = new IndexWriter(fs.startLocalOutput(perm, temp) + .toString(), analyzer, true); + + // no delete, so no need for maxBufferedDeleteTerms + writer.setMaxBufferedDocs(indexConf.getMaxBufferedDocs()); + writer.setMaxFieldLength(indexConf.getMaxFieldLength()); + writer.setMaxMergeDocs(indexConf.getMaxMergeDocs()); + writer.setMergeFactor(indexConf.getMergeFactor()); + String similarityName = indexConf.getSimilarityName(); + if (similarityName != null) { + try { + Class similarityClass = Class.forName(similarityName); + Similarity similarity = (Similarity) similarityClass.newInstance(); + writer.setSimilarity(similarity); + } catch (Exception e) { + throw new IOException("Error in creating a similarty object " + + similarityName); + } + } + writer.setUseCompoundFile(indexConf.isUseCompoundFile()); + + return new RecordWriter() { + private boolean closed; + private long docCount = 0; + + public void write(@SuppressWarnings("unused") Text key, + LuceneDocumentWrapper value) + throws IOException { + // unwrap and index doc + Document doc = value.get(); + if (LOG.isDebugEnabled()) { + LOG.debug(" Indexing [" + doc + "]"); + } + + writer.addDocument(doc); + docCount++; + progress.progress(); + } + + public void close(final Reporter reporter) throws IOException { + // spawn a thread to give progress heartbeats + Thread prog = new Thread() { + public void run() { + while (!closed) { + try { + reporter.setStatus("closing"); + Thread.sleep(1000); + } catch (InterruptedException e) { + continue; + } catch (Throwable e) { + return; + } + } + } + }; + + try { + prog.start(); + + // optimize index + if (indexConf.doOptimize()) { + if (LOG.isInfoEnabled()) { + LOG.info("Optimizing index."); + } + writer.optimize(); + } + + // close index + writer.close(); + if (LOG.isInfoEnabled()) { + LOG.info("Done indexing " + docCount + " docs."); + } + + // copy to perm destination in dfs + fs.completeLocalOutput(perm, temp); + if (LOG.isInfoEnabled()) { + LOG.info("Copy done."); + } + } finally { + closed = true; + } + } + }; + } +} \ No newline at end of file Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java?rev=580399&view=auto ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java (added) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java Fri Sep 28 09:12:24 2007 @@ -0,0 +1,107 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; + +/** + * Construct a Lucene document per row, which is consumed by IndexOutputFormat + * to build a Lucene index + */ +public class IndexTableReduce extends MapReduceBase implements + Reducer { + private static final Logger LOG = Logger.getLogger(IndexTableReduce.class); + + private IndexConfiguration indexConf; + + public void configure(JobConf job) { + super.configure(job); + indexConf = new IndexConfiguration(); + String content = job.get("hbase.index.conf"); + if (content != null) { + indexConf.addFromXML(content); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Index conf: " + indexConf); + } + } + + public void close() throws IOException { + super.close(); + } + + public void reduce(Text key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + if (!values.hasNext()) { + return; + } + + Document doc = new Document(); + + // index and store row key, row key already UTF-8 encoded + Field keyField = new Field(indexConf.getRowkeyName(), key.toString(), + Field.Store.YES, Field.Index.UN_TOKENIZED); + keyField.setOmitNorms(true); + doc.add(keyField); + + while (values.hasNext()) { + MapWritable value = values.next(); + + // each column (name-value pair) is a field (name-value pair) + for (Map.Entry entry : value.entrySet()) { + // name is already UTF-8 encoded + String column = ((Text) entry.getKey()).toString(); + byte[] columnValue = ((ImmutableBytesWritable)entry.getValue()).get(); + Field.Store store = indexConf.isStore(column)? + Field.Store.YES: Field.Store.NO; + Field.Index index = indexConf.isIndex(column)? + (indexConf.isTokenize(column)? + Field.Index.TOKENIZED: Field.Index.UN_TOKENIZED): + Field.Index.NO; + + // UTF-8 encode value + Field field = new Field(column, new String(columnValue, + HConstants.UTF8_ENCODING), store, index); + field.setBoost(indexConf.getBoost(column)); + field.setOmitNorms(indexConf.isOmitNorms(column)); + + doc.add(field); + } + } + output.collect(key, new LuceneDocumentWrapper(doc)); + } +} Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java?rev=580399&view=auto ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java (added) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java Fri Sep 28 09:12:24 2007 @@ -0,0 +1,51 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.lucene.document.Document; + +/** + * A utility class used to pass a lucene document from reduce to OutputFormat. + * It doesn't really serialize/deserialize a lucene document. + */ +class LuceneDocumentWrapper implements Writable { + private Document doc; + + public LuceneDocumentWrapper(Document doc) { + this.doc = doc; + } + + public Document get() { + return doc; + } + + public void readFields(DataInput in) throws IOException { + // intentionally left blank + } + + public void write(DataOutput out) throws IOException { + // intentionally left blank + } +} \ No newline at end of file Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java?rev=580399&view=auto ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java (added) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java Fri Sep 28 09:12:24 2007 @@ -0,0 +1,297 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; + +import junit.framework.TestSuite; +import junit.textui.TestRunner; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.dfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseAdmin; +import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HScannerInterface; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.HTable; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.MultiRegionTable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MultiSearcher; +import org.apache.lucene.search.Searchable; +import org.apache.lucene.search.Searcher; +import org.apache.lucene.search.TermQuery; + +/** + * Test Map/Reduce job to build index over HBase table + */ +public class TestTableIndex extends HBaseTestCase { + private static final Log LOG = LogFactory.getLog(TestTableIndex.class); + + static final String TABLE_NAME = "moretest"; + static final String INPUT_COLUMN = "contents:"; + static final Text TEXT_INPUT_COLUMN = new Text(INPUT_COLUMN); + static final String OUTPUT_COLUMN = "text:"; + static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN); + static final String ROWKEY_NAME = "key"; + static final String INDEX_DIR = "testindex"; + + private HTableDescriptor desc; + + private MiniDFSCluster dfsCluster = null; + private FileSystem fs; + private Path dir; + private MiniHBaseCluster hCluster = null; + + @Override + public void setUp() throws Exception { + super.setUp(); + + // This size should make it so we always split using the addContent + // below. After adding all data, the first region is 1.3M + conf.setLong("hbase.hregion.max.filesize", 256 * 1024); + + desc = new HTableDescriptor(TABLE_NAME); + desc.addFamily(new HColumnDescriptor(INPUT_COLUMN)); + desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN)); + + dfsCluster = new MiniDFSCluster(conf, 1, true, (String[]) null); + try { + fs = dfsCluster.getFileSystem(); + + dir = new Path("/hbase"); + fs.mkdirs(dir); + + // Start up HBase cluster + hCluster = new MiniHBaseCluster(conf, 1, dfsCluster); + + // Create a table. + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(desc); + + // Populate a table into multiple regions + MultiRegionTable.makeMultiRegionTable(conf, hCluster, null, TABLE_NAME, + INPUT_COLUMN); + + // Verify table indeed has multiple regions + HTable table = new HTable(conf, new Text(TABLE_NAME)); + Text[] startKeys = table.getStartKeys(); + assertTrue(startKeys.length > 1); + } catch (Exception e) { + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + throw e; + } + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + + if (hCluster != null) { + hCluster.shutdown(); + } + + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + } + + /** + * Test HBase map/reduce + * + * @throws IOException + */ + @SuppressWarnings("static-access") + public void testTableIndex() throws IOException { + long firstK = 32; + LOG.info("Print table contents before map/reduce"); + scanTable(conf, firstK); + + @SuppressWarnings("deprecation") + MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); + + // set configuration parameter for index build + conf.set("hbase.index.conf", createIndexConfContent()); + + try { + JobConf jobConf = new JobConf(conf, TestTableIndex.class); + jobConf.setJobName("index column contents"); + jobConf.setNumMapTasks(2); + // number of indexes to partition into + jobConf.setNumReduceTasks(1); + + // use identity map (a waste, but just as an example) + IdentityTableMap.initJob(TABLE_NAME, INPUT_COLUMN, + IdentityTableMap.class, jobConf); + + // use IndexTableReduce to build a Lucene index + jobConf.setReducerClass(IndexTableReduce.class); + jobConf.setOutputPath(new Path(INDEX_DIR)); + jobConf.setOutputFormat(IndexOutputFormat.class); + + JobClient.runJob(jobConf); + + } finally { + mrCluster.shutdown(); + } + + LOG.info("Print table contents after map/reduce"); + scanTable(conf, firstK); + + // verify index results + verify(conf); + } + + private String createIndexConfContent() { + StringBuffer buffer = new StringBuffer(); + buffer.append("" + + "hbase.column.name" + INPUT_COLUMN + + ""); + buffer.append("hbase.column.store " + + "true"); + buffer.append("hbase.column.index" + + "true"); + buffer.append("hbase.column.tokenize" + + "false"); + buffer.append("hbase.column.boost" + + "3"); + buffer.append("hbase.column.omit.norms" + + "false"); + buffer.append("hbase.index.rowkey.name" + + ROWKEY_NAME + ""); + buffer.append("hbase.index.max.buffered.docs" + + "500"); + buffer.append("hbase.index.max.field.length" + + "10000"); + buffer.append("hbase.index.merge.factor" + + "10"); + buffer.append("hbase.index.use.compound.file" + + "true"); + buffer.append("hbase.index.optimize" + + "true"); + + IndexConfiguration c = new IndexConfiguration(); + c.addFromXML(buffer.toString()); + return c.toString(); + } + + private void scanTable(Configuration c, long firstK) throws IOException { + HTable table = new HTable(c, new Text(TABLE_NAME)); + Text[] columns = { TEXT_INPUT_COLUMN, TEXT_OUTPUT_COLUMN }; + HScannerInterface scanner = table.obtainScanner(columns, + HConstants.EMPTY_START_ROW); + long count = 0; + try { + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + while (scanner.next(key, results)) { + if (count < firstK) + LOG.info("row: " + key.getRow()); + for (Map.Entry e : results.entrySet()) { + if (count < firstK) + LOG.info(" column: " + e.getKey() + " value: " + + new String(e.getValue(), HConstants.UTF8_ENCODING)); + } + count++; + } + } finally { + scanner.close(); + } + } + + private void verify(Configuration c) throws IOException { + Path localDir = new Path(this.testDir, "index_" + + Integer.toString(new Random().nextInt())); + this.fs.copyToLocalFile(new Path(INDEX_DIR), localDir); + Path [] indexDirs = this.localFs.listPaths(new Path [] {localDir}); + Searcher searcher = null; + HScannerInterface scanner = null; + try { + if (indexDirs.length == 1) { + searcher = new IndexSearcher((new File(indexDirs[0]. + toUri())).getAbsolutePath()); + } else if (indexDirs.length > 1) { + Searchable[] searchers = new Searchable[indexDirs.length]; + for (int i = 0; i < indexDirs.length; i++) { + searchers[i] = new IndexSearcher((new File(indexDirs[i]. + toUri()).getAbsolutePath())); + } + searcher = new MultiSearcher(searchers); + } else { + throw new IOException("no index directory found"); + } + + HTable table = new HTable(c, new Text(TABLE_NAME)); + Text[] columns = { TEXT_INPUT_COLUMN, TEXT_OUTPUT_COLUMN }; + scanner = table.obtainScanner(columns, HConstants.EMPTY_START_ROW); + + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + + IndexConfiguration indexConf = new IndexConfiguration(); + String content = c.get("hbase.index.conf"); + if (content != null) { + indexConf.addFromXML(content); + } + String rowkeyName = indexConf.getRowkeyName(); + + int count = 0; + while (scanner.next(key, results)) { + String value = key.getRow().toString(); + Term term = new Term(rowkeyName, value); + int hitCount = searcher.search(new TermQuery(term)).length(); + assertEquals("check row " + value, 1, hitCount); + count++; + } + int maxDoc = searcher.maxDoc(); + assertEquals("check number of rows", count, maxDoc); + } finally { + if (null != searcher) + searcher.close(); + if (null != scanner) + scanner.close(); + } + } + /** + * @param args unused + */ + public static void main(@SuppressWarnings("unused") String[] args) { + TestRunner.run(new TestSuite(TestTableIndex.class)); + } +} \ No newline at end of file Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java?rev=580399&view=auto ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (added) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java Fri Sep 28 09:12:24 2007 @@ -0,0 +1,387 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.dfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.hbase.HBaseAdmin; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HScannerInterface; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.HTable; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.MultiRegionTable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableMap; +import org.apache.hadoop.hbase.mapred.TableOutputCollector; +import org.apache.hadoop.hbase.mapred.TableReduce; +import org.apache.hadoop.hbase.mapred.IdentityTableReduce; + +/** + * Test Map/Reduce job over HBase tables + */ +public class TestTableMapReduce extends MultiRegionTable { + @SuppressWarnings("hiding") + private static final Log LOG = + LogFactory.getLog(TestTableMapReduce.class.getName()); + + static final String SINGLE_REGION_TABLE_NAME = "srtest"; + static final String MULTI_REGION_TABLE_NAME = "mrtest"; + static final String INPUT_COLUMN = "contents:"; + static final Text TEXT_INPUT_COLUMN = new Text(INPUT_COLUMN); + static final String OUTPUT_COLUMN = "text:"; + static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN); + + private MiniDFSCluster dfsCluster = null; + private FileSystem fs; + private Path dir; + private MiniHBaseCluster hCluster = null; + + private static byte[][] values = null; + + static { + try { + values = new byte[][] { + "0123".getBytes(HConstants.UTF8_ENCODING), + "abcd".getBytes(HConstants.UTF8_ENCODING), + "wxyz".getBytes(HConstants.UTF8_ENCODING), + "6789".getBytes(HConstants.UTF8_ENCODING) + }; + } catch (UnsupportedEncodingException e) { + fail(); + } + } + + /** constructor */ + public TestTableMapReduce() { + super(); + + // Make lease timeout longer, lease checks less frequent + conf.setInt("hbase.master.lease.period", 10 * 1000); + conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000); + } + + /** + * {@inheritDoc} + */ + @Override + public void setUp() throws Exception { + super.setUp(); + // This size is picked so the table is split into two + // after addContent in testMultiRegionTableMapReduce. + conf.setLong("hbase.hregion.max.filesize", 256 * 1024); + dfsCluster = new MiniDFSCluster(conf, 1, true, (String[])null); + try { + fs = dfsCluster.getFileSystem(); + dir = new Path("/hbase"); + fs.mkdirs(dir); + // Start up HBase cluster + hCluster = new MiniHBaseCluster(conf, 1, dfsCluster); + LOG.info("Master is at " + this.conf.get(HConstants.MASTER_ADDRESS)); + } catch (Exception e) { + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + throw e; + } + } + + /** + * {@inheritDoc} + */ + @Override + public void tearDown() throws Exception { + super.tearDown(); + if(hCluster != null) { + hCluster.shutdown(); + } + + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + + if (fs != null) { + try { + fs.close(); + } catch (IOException e) { + LOG.info("During tear down got a " + e.getMessage()); + } + } + } + + /** + * Pass the given key and processed record reduce + */ + public static class ProcessContentsMapper extends TableMap { + + /** constructor */ + public ProcessContentsMapper() { + super(); + } + + /** + * Pass the key, and reversed value to reduce + * + * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter) + */ + @SuppressWarnings("unchecked") + @Override + public void map(HStoreKey key, MapWritable value, + TableOutputCollector output, + @SuppressWarnings("unused") Reporter reporter) throws IOException { + + Text tKey = key.getRow(); + + if(value.size() != 1) { + throw new IOException("There should only be one input column"); + } + + Text[] keys = value.keySet().toArray(new Text[value.size()]); + if(!keys[0].equals(TEXT_INPUT_COLUMN)) { + throw new IOException("Wrong input column. Expected: " + INPUT_COLUMN + + " but got: " + keys[0]); + } + + // Get the original value and reverse it + + String originalValue = + new String(((ImmutableBytesWritable)value.get(keys[0])).get(), + HConstants.UTF8_ENCODING); + StringBuilder newValue = new StringBuilder(); + for(int i = originalValue.length() - 1; i >= 0; i--) { + newValue.append(originalValue.charAt(i)); + } + + // Now set the value to be collected + + MapWritable outval = new MapWritable(); + outval.put(TEXT_OUTPUT_COLUMN, new ImmutableBytesWritable( + newValue.toString().getBytes(HConstants.UTF8_ENCODING))); + + output.collect(tKey, outval); + } + } + + /** + * Test hbase mapreduce jobs against single region and multi-region tables. + * @throws IOException + */ + public void testTableMapReduce() throws IOException { + localTestSingleRegionTable(); + localTestMultiRegionTable(); + } + + /* + * Test against a single region. + * @throws IOException + */ + private void localTestSingleRegionTable() throws IOException { + HTableDescriptor desc = new HTableDescriptor(SINGLE_REGION_TABLE_NAME); + desc.addFamily(new HColumnDescriptor(INPUT_COLUMN)); + desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN)); + + // Create a table. + HBaseAdmin admin = new HBaseAdmin(this.conf); + admin.createTable(desc); + + // insert some data into the test table + HTable table = new HTable(conf, new Text(SINGLE_REGION_TABLE_NAME)); + + for(int i = 0; i < values.length; i++) { + long lockid = table.startUpdate(new Text("row_" + + String.format("%1$05d", i))); + + try { + table.put(lockid, TEXT_INPUT_COLUMN, values[i]); + table.commit(lockid, System.currentTimeMillis()); + lockid = -1; + } finally { + if (lockid != -1) + table.abort(lockid); + } + } + + LOG.info("Print table contents before map/reduce"); + scanTable(conf, SINGLE_REGION_TABLE_NAME); + + @SuppressWarnings("deprecation") + MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); + + try { + JobConf jobConf = new JobConf(conf, TestTableMapReduce.class); + jobConf.setJobName("process column contents"); + jobConf.setNumMapTasks(1); + jobConf.setNumReduceTasks(1); + + TableMap.initJob(SINGLE_REGION_TABLE_NAME, INPUT_COLUMN, + ProcessContentsMapper.class, jobConf); + + TableReduce.initJob(SINGLE_REGION_TABLE_NAME, + IdentityTableReduce.class, jobConf); + + JobClient.runJob(jobConf); + + } finally { + mrCluster.shutdown(); + } + + LOG.info("Print table contents after map/reduce"); + scanTable(conf, SINGLE_REGION_TABLE_NAME); + + // verify map-reduce results + verify(conf, SINGLE_REGION_TABLE_NAME); + } + + /* + * Test against multiple regions. + * @throws IOException + */ + private void localTestMultiRegionTable() throws IOException { + HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME); + desc.addFamily(new HColumnDescriptor(INPUT_COLUMN)); + desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN)); + + // Create a table. + HBaseAdmin admin = new HBaseAdmin(this.conf); + admin.createTable(desc); + + // Populate a table into multiple regions + MultiRegionTable.makeMultiRegionTable(conf, hCluster, fs, + MULTI_REGION_TABLE_NAME, INPUT_COLUMN); + + // Verify table indeed has multiple regions + HTable table = new HTable(conf, new Text(MULTI_REGION_TABLE_NAME)); + Text[] startKeys = table.getStartKeys(); + assertTrue(startKeys.length > 1); + + @SuppressWarnings("deprecation") + MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); + + try { + JobConf jobConf = new JobConf(conf, TestTableMapReduce.class); + jobConf.setJobName("process column contents"); + jobConf.setNumMapTasks(2); + jobConf.setNumReduceTasks(1); + + TableMap.initJob(MULTI_REGION_TABLE_NAME, INPUT_COLUMN, + ProcessContentsMapper.class, jobConf); + + TableReduce.initJob(MULTI_REGION_TABLE_NAME, + IdentityTableReduce.class, jobConf); + + JobClient.runJob(jobConf); + + } finally { + mrCluster.shutdown(); + } + + // verify map-reduce results + verify(conf, MULTI_REGION_TABLE_NAME); + } + + private void scanTable(Configuration conf, String tableName) + throws IOException { + HTable table = new HTable(conf, new Text(tableName)); + + Text[] columns = { + TEXT_INPUT_COLUMN, + TEXT_OUTPUT_COLUMN + }; + HScannerInterface scanner = + table.obtainScanner(columns, HConstants.EMPTY_START_ROW); + + try { + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + + while(scanner.next(key, results)) { + LOG.info("row: " + key.getRow()); + + for(Map.Entry e: results.entrySet()) { + LOG.info(" column: " + e.getKey() + " value: " + + new String(e.getValue(), HConstants.UTF8_ENCODING)); + } + } + + } finally { + scanner.close(); + } + } + + @SuppressWarnings("null") + private void verify(Configuration conf, String tableName) throws IOException { + HTable table = new HTable(conf, new Text(tableName)); + + Text[] columns = { + TEXT_INPUT_COLUMN, + TEXT_OUTPUT_COLUMN + }; + HScannerInterface scanner = + table.obtainScanner(columns, HConstants.EMPTY_START_ROW); + + try { + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + + while(scanner.next(key, results)) { + byte[] firstValue = null; + byte[] secondValue = null; + int count = 0; + + for(Map.Entry e: results.entrySet()) { + if (count == 0) + firstValue = e.getValue(); + if (count == 1) + secondValue = e.getValue(); + count++; + } + + // verify second value is the reverse of the first + assertNotNull(firstValue); + assertNotNull(secondValue); + assertEquals(firstValue.length, secondValue.length); + for (int i=0; i