hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject svn commit: r785076 [6/18] - in /hadoop/hbase/trunk_on_hadoop-0.18.3: ./ bin/ conf/ src/java/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/filter/ src/java/org/apache/hadoop/hbase/io/ src/j...
Date Tue, 16 Jun 2009 04:34:02 GMT
Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/IndexConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/IndexConfiguration.java?rev=785076&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/IndexConfiguration.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/IndexConfiguration.java Tue Jun 16 04:33:56 2009
@@ -0,0 +1,422 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+
+/**
+ * Configuration parameters for building a Lucene index
+ */
+public class IndexConfiguration extends Configuration {
+  private static final Log LOG = LogFactory.getLog(IndexConfiguration.class);
+
+  static final String HBASE_COLUMN_NAME = "hbase.column.name";
+  static final String HBASE_COLUMN_STORE = "hbase.column.store";
+  static final String HBASE_COLUMN_INDEX = "hbase.column.index";
+  static final String HBASE_COLUMN_TOKENIZE = "hbase.column.tokenize";
+  static final String HBASE_COLUMN_BOOST = "hbase.column.boost";
+  static final String HBASE_COLUMN_OMIT_NORMS = "hbase.column.omit.norms";
+  static final String HBASE_INDEX_ROWKEY_NAME = "hbase.index.rowkey.name";
+  static final String HBASE_INDEX_ANALYZER_NAME = "hbase.index.analyzer.name";
+  static final String HBASE_INDEX_MAX_BUFFERED_DOCS =
+    "hbase.index.max.buffered.docs";
+  static final String HBASE_INDEX_MAX_BUFFERED_DELS =
+    "hbase.index.max.buffered.dels";
+  static final String HBASE_INDEX_MAX_FIELD_LENGTH =
+    "hbase.index.max.field.length";
+  static final String HBASE_INDEX_MAX_MERGE_DOCS =
+    "hbase.index.max.merge.docs";
+  static final String HBASE_INDEX_MERGE_FACTOR = "hbase.index.merge.factor";
+  // double ramBufferSizeMB;
+  static final String HBASE_INDEX_SIMILARITY_NAME =
+    "hbase.index.similarity.name";
+  static final String HBASE_INDEX_USE_COMPOUND_FILE =
+    "hbase.index.use.compound.file";
+  static final String HBASE_INDEX_OPTIMIZE = "hbase.index.optimize";
+
+  public static class ColumnConf extends Properties {
+
+    private static final long serialVersionUID = 7419012290580607821L;
+
+    boolean getBoolean(String name, boolean defaultValue) {
+      String valueString = getProperty(name);
+      if ("true".equals(valueString))
+        return true;
+      else if ("false".equals(valueString))
+        return false;
+      else
+        return defaultValue;
+    }
+
+    void setBoolean(String name, boolean value) {
+      setProperty(name, Boolean.toString(value));
+    }
+
+    float getFloat(String name, float defaultValue) {
+      String valueString = getProperty(name);
+      if (valueString == null)
+        return defaultValue;
+      try {
+        return Float.parseFloat(valueString);
+      } catch (NumberFormatException e) {
+        return defaultValue;
+      }
+    }
+
+    void setFloat(String name, float value) {
+      setProperty(name, Float.toString(value));
+    }
+  }
+
+  private Map<String, ColumnConf> columnMap =
+    new ConcurrentHashMap<String, ColumnConf>();
+
+  public Iterator<String> columnNameIterator() {
+    return columnMap.keySet().iterator();
+  }
+
+  public boolean isIndex(String columnName) {
+    return getColumn(columnName).getBoolean(HBASE_COLUMN_INDEX, true);
+  }
+
+  public void setIndex(String columnName, boolean index) {
+    getColumn(columnName).setBoolean(HBASE_COLUMN_INDEX, index);
+  }
+
+  public boolean isStore(String columnName) {
+    return getColumn(columnName).getBoolean(HBASE_COLUMN_STORE, false);
+  }
+
+  public void setStore(String columnName, boolean store) {
+    getColumn(columnName).setBoolean(HBASE_COLUMN_STORE, store);
+  }
+
+  public boolean isTokenize(String columnName) {
+    return getColumn(columnName).getBoolean(HBASE_COLUMN_TOKENIZE, true);
+  }
+
+  public void setTokenize(String columnName, boolean tokenize) {
+    getColumn(columnName).setBoolean(HBASE_COLUMN_TOKENIZE, tokenize);
+  }
+
+  public float getBoost(String columnName) {
+    return getColumn(columnName).getFloat(HBASE_COLUMN_BOOST, 1.0f);
+  }
+
+  public void setBoost(String columnName, float boost) {
+    getColumn(columnName).setFloat(HBASE_COLUMN_BOOST, boost);
+  }
+
+  public boolean isOmitNorms(String columnName) {
+    return getColumn(columnName).getBoolean(HBASE_COLUMN_OMIT_NORMS, true);
+  }
+
+  public void setOmitNorms(String columnName, boolean omitNorms) {
+    getColumn(columnName).setBoolean(HBASE_COLUMN_OMIT_NORMS, omitNorms);
+  }
+
+  private ColumnConf getColumn(String columnName) {
+    ColumnConf column = columnMap.get(columnName);
+    if (column == null) {
+      column = new ColumnConf();
+      columnMap.put(columnName, column);
+    }
+    return column;
+  }
+
+  public String getAnalyzerName() {
+    return get(HBASE_INDEX_ANALYZER_NAME,
+        "org.apache.lucene.analysis.standard.StandardAnalyzer");
+  }
+
+  public void setAnalyzerName(String analyzerName) {
+    set(HBASE_INDEX_ANALYZER_NAME, analyzerName);
+  }
+
+  public int getMaxBufferedDeleteTerms() {
+    return getInt(HBASE_INDEX_MAX_BUFFERED_DELS, 1000);
+  }
+
+  public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
+    setInt(HBASE_INDEX_MAX_BUFFERED_DELS, maxBufferedDeleteTerms);
+  }
+
+  public int getMaxBufferedDocs() {
+    return getInt(HBASE_INDEX_MAX_BUFFERED_DOCS, 10);
+  }
+
+  public void setMaxBufferedDocs(int maxBufferedDocs) {
+    setInt(HBASE_INDEX_MAX_BUFFERED_DOCS, maxBufferedDocs);
+  }
+
+  public int getMaxFieldLength() {
+    return getInt(HBASE_INDEX_MAX_FIELD_LENGTH, Integer.MAX_VALUE);
+  }
+
+  public void setMaxFieldLength(int maxFieldLength) {
+    setInt(HBASE_INDEX_MAX_FIELD_LENGTH, maxFieldLength);
+  }
+
+  public int getMaxMergeDocs() {
+    return getInt(HBASE_INDEX_MAX_MERGE_DOCS, Integer.MAX_VALUE);
+  }
+
+  public void setMaxMergeDocs(int maxMergeDocs) {
+    setInt(HBASE_INDEX_MAX_MERGE_DOCS, maxMergeDocs);
+  }
+
+  public int getMergeFactor() {
+    return getInt(HBASE_INDEX_MERGE_FACTOR, 10);
+  }
+
+  public void setMergeFactor(int mergeFactor) {
+    setInt(HBASE_INDEX_MERGE_FACTOR, mergeFactor);
+  }
+
+  public String getRowkeyName() {
+    return get(HBASE_INDEX_ROWKEY_NAME, "ROWKEY");
+  }
+
+  public void setRowkeyName(String rowkeyName) {
+    set(HBASE_INDEX_ROWKEY_NAME, rowkeyName);
+  }
+
+  public String getSimilarityName() {
+    return get(HBASE_INDEX_SIMILARITY_NAME, null);
+  }
+
+  public void setSimilarityName(String similarityName) {
+    set(HBASE_INDEX_SIMILARITY_NAME, similarityName);
+  }
+
+  public boolean isUseCompoundFile() {
+    return getBoolean(HBASE_INDEX_USE_COMPOUND_FILE, false);
+  }
+
+  public void setUseCompoundFile(boolean useCompoundFile) {
+    setBoolean(HBASE_INDEX_USE_COMPOUND_FILE, useCompoundFile);
+  }
+
+  public boolean doOptimize() {
+    return getBoolean(HBASE_INDEX_OPTIMIZE, true);
+  }
+
+  public void setDoOptimize(boolean doOptimize) {
+    setBoolean(HBASE_INDEX_OPTIMIZE, doOptimize);
+  }
+
+  public void addFromXML(String content) {
+    try {
+      DocumentBuilder builder = DocumentBuilderFactory.newInstance()
+          .newDocumentBuilder();
+
+      Document doc = builder
+          .parse(new ByteArrayInputStream(content.getBytes()));
+
+      Element root = doc.getDocumentElement();
+      if (!"configuration".equals(root.getTagName())) {
+        LOG.fatal("bad conf file: top-level element not <configuration>");
+      }
+
+      NodeList props = root.getChildNodes();
+      for (int i = 0; i < props.getLength(); i++) {
+        Node propNode = props.item(i);
+        if (!(propNode instanceof Element)) {
+          continue;
+        }
+
+        Element prop = (Element) propNode;
+        if ("property".equals(prop.getTagName())) {
+          propertyFromXML(prop, null);
+        } else if ("column".equals(prop.getTagName())) {
+          columnConfFromXML(prop);
+        } else {
+          LOG.warn("bad conf content: element neither <property> nor <column>");
+        }
+      }
+    } catch (Exception e) {
+      LOG.fatal("error parsing conf content: " + e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void propertyFromXML(Element prop, Properties properties) {
+    NodeList fields = prop.getChildNodes();
+    String attr = null;
+    String value = null;
+
+    for (int j = 0; j < fields.getLength(); j++) {
+      Node fieldNode = fields.item(j);
+      if (!(fieldNode instanceof Element)) {
+        continue;
+      }
+
+      Element field = (Element) fieldNode;
+      if ("name".equals(field.getTagName())) {
+        attr = ((Text) field.getFirstChild()).getData();
+      }
+      if ("value".equals(field.getTagName()) && field.hasChildNodes()) {
+        value = ((Text) field.getFirstChild()).getData();
+      }
+    }
+
+    if (attr != null && value != null) {
+      if (properties == null) {
+        set(attr, value);
+      } else {
+        properties.setProperty(attr, value);
+      }
+    }
+  }
+
+  private void columnConfFromXML(Element column) {
+    ColumnConf columnConf = new ColumnConf();
+    NodeList props = column.getChildNodes();
+    for (int i = 0; i < props.getLength(); i++) {
+      Node propNode = props.item(i);
+      if (!(propNode instanceof Element)) {
+        continue;
+      }
+
+      Element prop = (Element) propNode;
+      if ("property".equals(prop.getTagName())) {
+        propertyFromXML(prop, columnConf);
+      } else {
+        LOG.warn("bad conf content: element not <property>");
+      }
+    }
+
+    if (columnConf.getProperty(HBASE_COLUMN_NAME) != null) {
+      columnMap.put(columnConf.getProperty(HBASE_COLUMN_NAME), columnConf);
+    } else {
+      LOG.warn("bad column conf: name not specified");
+    }
+  }
+
+  public void write(OutputStream out) {
+    try {
+      Document doc = writeDocument();
+      DOMSource source = new DOMSource(doc);
+      StreamResult result = new StreamResult(out);
+      TransformerFactory transFactory = TransformerFactory.newInstance();
+      Transformer transformer = transFactory.newTransformer();
+      transformer.transform(source, result);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Document writeDocument() {
+    Iterator<Map.Entry<String, String>> iter = iterator();
+    try {
+      Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder()
+          .newDocument();
+      Element conf = doc.createElement("configuration");
+      doc.appendChild(conf);
+      conf.appendChild(doc.createTextNode("\n"));
+
+      Map.Entry<String, String> entry;
+      while (iter.hasNext()) {
+        entry = iter.next();
+        String name = entry.getKey();
+        String value = entry.getValue();
+        writeProperty(doc, conf, name, value);
+      }
+
+      Iterator<String> columnIter = columnNameIterator();
+      while (columnIter.hasNext()) {
+        writeColumn(doc, conf, columnIter.next());
+      }
+
+      return doc;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void writeProperty(Document doc, Element parent, String name,
+      String value) {
+    Element propNode = doc.createElement("property");
+    parent.appendChild(propNode);
+
+    Element nameNode = doc.createElement("name");
+    nameNode.appendChild(doc.createTextNode(name));
+    propNode.appendChild(nameNode);
+
+    Element valueNode = doc.createElement("value");
+    valueNode.appendChild(doc.createTextNode(value));
+    propNode.appendChild(valueNode);
+
+    parent.appendChild(doc.createTextNode("\n"));
+  }
+
+  private void writeColumn(Document doc, Element parent, String columnName) {
+    Element column = doc.createElement("column");
+    parent.appendChild(column);
+    column.appendChild(doc.createTextNode("\n"));
+
+    ColumnConf columnConf = getColumn(columnName);
+    for (Map.Entry<Object, Object> entry : columnConf.entrySet()) {
+      if (entry.getKey() instanceof String
+          && entry.getValue() instanceof String) {
+        writeProperty(doc, column, (String) entry.getKey(), (String) entry
+            .getValue());
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringWriter writer = new StringWriter();
+    try {
+      Document doc = writeDocument();
+      DOMSource source = new DOMSource(doc);
+      StreamResult result = new StreamResult(writer);
+      TransformerFactory transFactory = TransformerFactory.newInstance();
+      Transformer transformer = transFactory.newTransformer();
+      transformer.transform(source, result);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return writer.toString();
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/IndexOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/IndexOutputFormat.java?rev=785076&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/IndexOutputFormat.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/IndexOutputFormat.java Tue Jun 16 04:33:56 2009
@@ -0,0 +1,163 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.search.Similarity;
+
+/**
+ * Create a local index, unwrap Lucene documents created by reduce, add them to
+ * the index, and copy the index to the destination.
+ */
+public class IndexOutputFormat extends
+    FileOutputFormat<ImmutableBytesWritable, LuceneDocumentWrapper> {
+  static final Log LOG = LogFactory.getLog(IndexOutputFormat.class);
+
+  private Random random = new Random();
+
+  @Override
+  public RecordWriter<ImmutableBytesWritable, LuceneDocumentWrapper>
+  getRecordWriter(final FileSystem fs, JobConf job, String name,
+      final Progressable progress)
+  throws IOException {
+
+    final Path perm = new Path(FileOutputFormat.getOutputPath(job), name);
+    final Path temp = job.getLocalPath("index/_"
+        + Integer.toString(random.nextInt()));
+
+    LOG.info("To index into " + perm);
+
+    // delete old, if any
+    fs.delete(perm, true);
+
+    final IndexConfiguration indexConf = new IndexConfiguration();
+    String content = job.get("hbase.index.conf");
+    if (content != null) {
+      indexConf.addFromXML(content);
+    }
+
+    String analyzerName = indexConf.getAnalyzerName();
+    Analyzer analyzer;
+    try {
+      Class<?> analyzerClass = Class.forName(analyzerName);
+      analyzer = (Analyzer) analyzerClass.newInstance();
+    } catch (Exception e) {
+      throw new IOException("Error in creating an analyzer object "
+          + analyzerName);
+    }
+
+    // build locally first
+    final IndexWriter writer = new IndexWriter(fs.startLocalOutput(perm, temp)
+        .toString(), analyzer, true);
+
+    // no delete, so no need for maxBufferedDeleteTerms
+    writer.setMaxBufferedDocs(indexConf.getMaxBufferedDocs());
+    writer.setMaxFieldLength(indexConf.getMaxFieldLength());
+    writer.setMaxMergeDocs(indexConf.getMaxMergeDocs());
+    writer.setMergeFactor(indexConf.getMergeFactor());
+    String similarityName = indexConf.getSimilarityName();
+    if (similarityName != null) {
+      try {
+        Class<?> similarityClass = Class.forName(similarityName);
+        Similarity similarity = (Similarity) similarityClass.newInstance();
+        writer.setSimilarity(similarity);
+      } catch (Exception e) {
+        throw new IOException("Error in creating a similarty object "
+            + similarityName);
+      }
+    }
+    writer.setUseCompoundFile(indexConf.isUseCompoundFile());
+
+    return new RecordWriter<ImmutableBytesWritable, LuceneDocumentWrapper>() {
+      boolean closed;
+      private long docCount = 0;
+
+      public void write(ImmutableBytesWritable key, 
+          LuceneDocumentWrapper value)
+      throws IOException {
+        // unwrap and index doc
+        Document doc = value.get();
+        writer.addDocument(doc);
+        docCount++;
+        progress.progress();
+      }
+
+      public void close(final Reporter reporter) throws IOException {
+        // spawn a thread to give progress heartbeats
+        Thread prog = new Thread() {
+          @Override
+          public void run() {
+            while (!closed) {
+              try {
+                reporter.setStatus("closing");
+                Thread.sleep(1000);
+              } catch (InterruptedException e) {
+                continue;
+              } catch (Throwable e) {
+                return;
+              }
+            }
+          }
+        };
+
+        try {
+          prog.start();
+
+          // optimize index
+          if (indexConf.doOptimize()) {
+            if (LOG.isInfoEnabled()) {
+              LOG.info("Optimizing index.");
+            }
+            writer.optimize();
+          }
+
+          // close index
+          writer.close();
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Done indexing " + docCount + " docs.");
+          }
+
+          // copy to perm destination in dfs
+          fs.completeLocalOutput(perm, temp);
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Copy done.");
+          }
+        } finally {
+          closed = true;
+        }
+      }
+    };
+  }
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/IndexTableReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/IndexTableReduce.java?rev=785076&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/IndexTableReduce.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/IndexTableReduce.java Tue Jun 16 04:33:56 2009
@@ -0,0 +1,109 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Construct a Lucene document per row, which is consumed by IndexOutputFormat
+ * to build a Lucene index
+ */
+public class IndexTableReduce extends MapReduceBase implements
+    Reducer<ImmutableBytesWritable, Result, ImmutableBytesWritable, LuceneDocumentWrapper> {
+  private static final Log LOG = LogFactory.getLog(IndexTableReduce.class);
+  private IndexConfiguration indexConf;
+
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    indexConf = new IndexConfiguration();
+    String content = job.get("hbase.index.conf");
+    if (content != null) {
+      indexConf.addFromXML(content);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Index conf: " + indexConf);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+  }
+
+  public void reduce(ImmutableBytesWritable key, Iterator<Result> values,
+      OutputCollector<ImmutableBytesWritable, LuceneDocumentWrapper> output,
+      Reporter reporter)
+  throws IOException {
+    if (!values.hasNext()) {
+      return;
+    }
+
+    Document doc = new Document();
+
+    // index and store row key, row key already UTF-8 encoded
+    Field keyField = new Field(indexConf.getRowkeyName(),
+      Bytes.toString(key.get(), key.getOffset(), key.getLength()),
+      Field.Store.YES, Field.Index.UN_TOKENIZED);
+    keyField.setOmitNorms(true);
+    doc.add(keyField);
+
+    while (values.hasNext()) {
+      Result value = values.next();
+
+      // each column (name-value pair) is a field (name-value pair)
+      for (KeyValue kv: value.list()) {
+        // name is already UTF-8 encoded
+        String column = Bytes.toString(kv.getColumn());
+        byte[] columnValue = kv.getValue();
+        Field.Store store = indexConf.isStore(column)?
+          Field.Store.YES: Field.Store.NO;
+        Field.Index index = indexConf.isIndex(column)?
+          (indexConf.isTokenize(column)?
+            Field.Index.TOKENIZED: Field.Index.UN_TOKENIZED):
+            Field.Index.NO;
+
+        // UTF-8 encode value
+        Field field = new Field(column, Bytes.toString(columnValue), 
+          store, index);
+        field.setBoost(indexConf.getBoost(column));
+        field.setOmitNorms(indexConf.isOmitNorms(column));
+
+        doc.add(field);
+      }
+    }
+    output.collect(key, new LuceneDocumentWrapper(doc));
+  }
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/LuceneDocumentWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/LuceneDocumentWrapper.java?rev=785076&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/LuceneDocumentWrapper.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/LuceneDocumentWrapper.java Tue Jun 16 04:33:56 2009
@@ -0,0 +1,55 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import org.apache.hadoop.io.Writable;
+import org.apache.lucene.document.Document;
+
+/**
+ * A utility class used to pass a lucene document from reduce to OutputFormat.
+ * It doesn't really serialize/deserialize a lucene document.
+ */
+public class LuceneDocumentWrapper implements Writable {
+  protected Document doc;
+
+  /**
+   * @param doc
+   */
+  public LuceneDocumentWrapper(Document doc) {
+    this.doc = doc;
+  }
+
+  /**
+   * @return the document
+   */
+  public Document get() {
+    return doc;
+  }
+
+  public void readFields(DataInput in) {
+    // intentionally left blank
+  }
+
+  public void write(DataOutput out) {
+    // intentionally left blank
+  }
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java?rev=785076&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java Tue Jun 16 04:33:56 2009
@@ -0,0 +1,134 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A job with a map to count rows.
+ * Map outputs table rows IF the input row has columns that have content.  
+ * Uses an {@link IdentityReducer}
+ */
+public class RowCounter extends Configured implements Tool {
+  // Name of this 'program'
+  static final String NAME = "rowcounter";
+
+  /**
+   * Mapper that runs the count.
+   */
+  static class RowCounterMapper
+  implements TableMap<ImmutableBytesWritable, Result> {
+    private static enum Counters {ROWS}
+
+    public void map(ImmutableBytesWritable row, Result values,
+        OutputCollector<ImmutableBytesWritable, Result> output,
+        Reporter reporter)
+    throws IOException {
+      boolean content = !values.list().isEmpty();
+      for (KeyValue value: values.list()) {
+        if (value.getValue().length > 0) {
+          content = true;
+          break;
+        }
+      }
+      if (!content) {
+        // Don't count rows that are all empty values.
+        return;
+      }
+      // Give out same value every time.  We're only interested in the row/key
+      reporter.incrCounter(Counters.ROWS, 1);
+    }
+
+    public void configure(JobConf jc) {
+      // Nothing to do.
+    }
+
+    public void close() throws IOException {
+      // Nothing to do.
+    }
+  }
+
+  /**
+   * @param args
+   * @return the JobConf
+   * @throws IOException
+   */
+  public JobConf createSubmittableJob(String[] args) throws IOException {
+    JobConf c = new JobConf(getConf(), getClass());
+    c.setJobName(NAME);
+    // Columns are space delimited
+    StringBuilder sb = new StringBuilder();
+    final int columnoffset = 2;
+    for (int i = columnoffset; i < args.length; i++) {
+      if (i > columnoffset) {
+        sb.append(" ");
+      }
+      sb.append(args[i]);
+    }
+    // Second argument is the table name.
+    TableMapReduceUtil.initTableMapJob(args[1], sb.toString(),
+      RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, c);
+    c.setNumReduceTasks(0);
+    // First arg is the output directory.
+    FileOutputFormat.setOutputPath(c, new Path(args[0]));
+    return c;
+  }
+  
+  static int printUsage() {
+    System.out.println(NAME +
+      " <outputdir> <tablename> <column1> [<column2>...]");
+    return -1;
+  }
+  
+  public int run(final String[] args) throws Exception {
+    // Make sure there are at least 3 parameters
+    if (args.length < 3) {
+      System.err.println("ERROR: Wrong number of parameters: " + args.length);
+      return printUsage();
+    }
+    JobClient.runJob(createSubmittableJob(args));
+    return 0;
+  }
+
+  /**
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    HBaseConfiguration c = new HBaseConfiguration();
+    int errCode = ToolRunner.run(c, new RowCounter(), args);
+    System.exit(errCode);
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter_Counters.properties
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter_Counters.properties?rev=785076&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter_Counters.properties (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter_Counters.properties Tue Jun 16 04:33:56 2009
@@ -0,0 +1,6 @@
+
+# ResourceBundle properties file for RowCounter MR job
+
+CounterGroupName=         RowCounter
+
+ROWS.name=                Rows

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java?rev=785076&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java Tue Jun 16 04:33:56 2009
@@ -0,0 +1,82 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Convert HBase tabular data into a format that is consumable by Map/Reduce.
+ */
+public class TableInputFormat extends TableInputFormatBase implements
+    JobConfigurable {
+  private final Log LOG = LogFactory.getLog(TableInputFormat.class);
+
+  /**
+   * space delimited list of columns
+   */
+  public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
+
+  public void configure(JobConf job) {
+    Path[] tableNames = FileInputFormat.getInputPaths(job);
+    String colArg = job.get(COLUMN_LIST);
+    String[] colNames = colArg.split(" ");
+    byte [][] m_cols = new byte[colNames.length][];
+    for (int i = 0; i < m_cols.length; i++) {
+      m_cols[i] = Bytes.toBytes(colNames[i]);
+    }
+    setInputColumns(m_cols);
+    try {
+      setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
+    } catch (Exception e) {
+      LOG.error(StringUtils.stringifyException(e));
+    }
+  }
+
+  public void validateInput(JobConf job) throws IOException {
+    // expecting exactly one path
+    Path [] tableNames = FileInputFormat.getInputPaths(job);
+    if (tableNames == null || tableNames.length > 1) {
+      throw new IOException("expecting one table name");
+    }
+
+    // connected to table?
+    if (getHTable() == null) {
+      throw new IOException("could not connect to table '" +
+        tableNames[0].getName() + "'");
+    }
+
+    // expecting at least one column
+    String colArg = job.get(COLUMN_LIST);
+    if (colArg == null || colArg.length() == 0) {
+      throw new IOException("expecting at least one column");
+    }
+  }
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=785076&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java Tue Jun 16 04:33:56 2009
@@ -0,0 +1,347 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.filter.StopRowFilter;
+import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
+ * byte[] of input columns and optionally a {@link RowFilterInterface}.
+ * Subclasses may use other TableRecordReader implementations.
+ * <p>
+ * An example of a subclass:
+ * <pre>
+ *   class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
+ *
+ *     public void configure(JobConf job) {
+ *       HTable exampleTable = new HTable(new HBaseConfiguration(job),
+ *         Bytes.toBytes("exampleTable"));
+ *       // mandatory
+ *       setHTable(exampleTable);
+ *       Text[] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ *         Bytes.toBytes("columnB") };
+ *       // mandatory
+ *       setInputColumns(inputColumns);
+ *       RowFilterInterface exampleFilter = new RegExpRowFilter("keyPrefix.*");
+ *       // optional
+ *       setRowFilter(exampleFilter);
+ *     }
+ *
+ *     public void validateInput(JobConf job) throws IOException {
+ *     }
+ *  }
+ * </pre>
+ */
+
+public abstract class TableInputFormatBase
+implements InputFormat<ImmutableBytesWritable, Result> {
+  final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
+  private byte [][] inputColumns;
+  private HTable table;
+  private TableRecordReader tableRecordReader;
+  private RowFilterInterface rowFilter;
+
+  /**
+   * Iterate over an HBase table data, return (Text, RowResult) pairs
+   */
+  protected class TableRecordReader
+  implements RecordReader<ImmutableBytesWritable, Result> {
+    private byte [] startRow;
+    private byte [] endRow;
+    private byte [] lastRow;
+    private RowFilterInterface trrRowFilter;
+    private ResultScanner scanner;
+    private HTable htable;
+    private byte [][] trrInputColumns;
+
+    /**
+     * Restart from survivable exceptions by creating a new scanner.
+     *
+     * @param firstRow
+     * @throws IOException
+     */
+    public void restart(byte[] firstRow) throws IOException {
+      if ((endRow != null) && (endRow.length > 0)) {
+        if (trrRowFilter != null) {
+          final Set<RowFilterInterface> rowFiltersSet =
+            new HashSet<RowFilterInterface>();
+          rowFiltersSet.add(new WhileMatchRowFilter(new StopRowFilter(endRow)));
+          rowFiltersSet.add(trrRowFilter);
+          Scan scan = new Scan(startRow);
+          scan.addColumns(trrInputColumns);
+//          scan.setFilter(new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL,
+//              rowFiltersSet));
+          this.scanner = this.htable.getScanner(scan);
+        } else {
+          Scan scan = new Scan(firstRow, endRow);
+          scan.addColumns(trrInputColumns);
+          this.scanner = this.htable.getScanner(scan);
+        }
+      } else {
+        Scan scan = new Scan(firstRow);
+        scan.addColumns(trrInputColumns);
+//        scan.setFilter(trrRowFilter);
+        this.scanner = this.htable.getScanner(scan);
+      }
+    }
+
+    /**
+     * Build the scanner. Not done in constructor to allow for extension.
+     *
+     * @throws IOException
+     */
+    public void init() throws IOException {
+      restart(startRow);
+    }
+
+    /**
+     * @param htable the {@link HTable} to scan.
+     */
+    public void setHTable(HTable htable) {
+      this.htable = htable;
+    }
+
+    /**
+     * @param inputColumns the columns to be placed in {@link RowResult}.
+     */
+    public void setInputColumns(final byte [][] inputColumns) {
+      this.trrInputColumns = inputColumns;
+    }
+
+    /**
+     * @param startRow the first row in the split
+     */
+    public void setStartRow(final byte [] startRow) {
+      this.startRow = startRow;
+    }
+
+    /**
+     *
+     * @param endRow the last row in the split
+     */
+    public void setEndRow(final byte [] endRow) {
+      this.endRow = endRow;
+    }
+
+    /**
+     * @param rowFilter the {@link RowFilterInterface} to be used.
+     */
+    public void setRowFilter(RowFilterInterface rowFilter) {
+      this.trrRowFilter = rowFilter;
+    }
+
+    public void close() {
+      this.scanner.close();
+    }
+
+    /**
+     * @return ImmutableBytesWritable
+     *
+     * @see org.apache.hadoop.mapred.RecordReader#createKey()
+     */
+    public ImmutableBytesWritable createKey() {
+      return new ImmutableBytesWritable();
+    }
+
+    /**
+     * @return RowResult
+     *
+     * @see org.apache.hadoop.mapred.RecordReader#createValue()
+     */
+    public Result createValue() {
+      return new Result();
+    }
+
+    public long getPos() {
+      // This should be the ordinal tuple in the range;
+      // not clear how to calculate...
+      return 0;
+    }
+
+    public float getProgress() {
+      // Depends on the total number of tuples and getPos
+      return 0;
+    }
+
+    /**
+     * @param key HStoreKey as input key.
+     * @param value MapWritable as input value
+     * @return true if there was more data
+     * @throws IOException
+     */
+    public boolean next(ImmutableBytesWritable key, Result value)
+    throws IOException {
+      Result result;
+      try {
+        result = this.scanner.next();
+      } catch (UnknownScannerException e) {
+        LOG.debug("recovered from " + StringUtils.stringifyException(e));  
+        restart(lastRow);
+        this.scanner.next();    // skip presumed already mapped row
+        result = this.scanner.next();
+      }
+
+      if (result != null && result.size() > 0) {
+        key.set(result.getRow());
+        lastRow = key.get();
+        Writables.copyWritable(result, value);
+        return true;
+      }
+      return false;
+    }
+  }
+
+  /**
+   * Builds a TableRecordReader. If no TableRecordReader was provided, uses
+   * the default.
+   *
+   * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
+   *      JobConf, Reporter)
+   */
+  public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+      InputSplit split, JobConf job, Reporter reporter)
+  throws IOException {
+    TableSplit tSplit = (TableSplit) split;
+    TableRecordReader trr = this.tableRecordReader;
+    // if no table record reader was provided use default
+    if (trr == null) {
+      trr = new TableRecordReader();
+    }
+    trr.setStartRow(tSplit.getStartRow());
+    trr.setEndRow(tSplit.getEndRow());
+    trr.setHTable(this.table);
+    trr.setInputColumns(this.inputColumns);
+    trr.setRowFilter(this.rowFilter);
+    trr.init();
+    return trr;
+  }
+
+  /**
+   * Calculates the splits that will serve as input for the map tasks.
+   * <ul>
+   * Splits are created in number equal to the smallest between numSplits and
+   * the number of {@link HRegion}s in the table. If the number of splits is
+   * smaller than the number of {@link HRegion}s then splits are spanned across
+   * multiple {@link HRegion}s and are grouped the most evenly possible. In the
+   * case splits are uneven the bigger splits are placed first in the
+   * {@link InputSplit} array.
+   *
+   * @param job the map task {@link JobConf}
+   * @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
+   *
+   * @return the input splits
+   *
+   * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
+   */
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    byte [][] startKeys = this.table.getStartKeys();
+    if (startKeys == null || startKeys.length == 0) {
+      throw new IOException("Expecting at least one region");
+    }
+    if (this.table == null) {
+      throw new IOException("No table was provided");
+    }
+    if (this.inputColumns == null || this.inputColumns.length == 0) {
+      throw new IOException("Expecting at least one column");
+    }
+    int realNumSplits = numSplits > startKeys.length? startKeys.length:
+      numSplits;
+    InputSplit[] splits = new InputSplit[realNumSplits];
+    int middle = startKeys.length / realNumSplits;
+    int startPos = 0;
+    for (int i = 0; i < realNumSplits; i++) {
+      int lastPos = startPos + middle;
+      lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
+      String regionLocation = table.getRegionLocation(startKeys[startPos]).
+        getServerAddress().getHostname(); 
+      splits[i] = new TableSplit(this.table.getTableName(),
+        startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
+          HConstants.EMPTY_START_ROW, regionLocation);
+      LOG.info("split: " + i + "->" + splits[i]);
+      startPos = lastPos;
+    }
+    return splits;
+  }
+
+  /**
+   * @param inputColumns to be passed in {@link RowResult} to the map task.
+   */
+  protected void setInputColumns(byte [][] inputColumns) {
+    this.inputColumns = inputColumns;
+  }
+
+  /**
+   * Allows subclasses to get the {@link HTable}.
+   */
+  protected HTable getHTable() {
+    return this.table;
+  }
+
+  /**
+   * Allows subclasses to set the {@link HTable}.
+   *
+   * @param table to get the data from
+   */
+  protected void setHTable(HTable table) {
+    this.table = table;
+  }
+
+  /**
+   * Allows subclasses to set the {@link TableRecordReader}.
+   *
+   * @param tableRecordReader
+   *                to provide other {@link TableRecordReader} implementations.
+   */
+  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+    this.tableRecordReader = tableRecordReader;
+  }
+
+  /**
+   * Allows subclasses to set the {@link RowFilterInterface} to be used.
+   *
+   * @param rowFilter
+   */
+  protected void setRowFilter(RowFilterInterface rowFilter) {
+    this.rowFilter = rowFilter;
+  }
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableMap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableMap.java?rev=785076&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableMap.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableMap.java Tue Jun 16 04:33:56 2009
@@ -0,0 +1,38 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Mapper;
+
+/**
+ * Scan an HBase table to sort by a specified sort column.
+ * If the column does not exist, the record is not passed to Reduce.
+ *
+ * @param <K> WritableComparable key class
+ * @param <V> Writable value class
+ */
+public interface TableMap<K extends WritableComparable<K>, V extends Writable>
+extends Mapper<ImmutableBytesWritable, Result, K, V> {
+
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=785076&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Tue Jun 16 04:33:56 2009
@@ -0,0 +1,171 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Utility for {@link TableMap} and {@link TableReduce}
+ */
+@SuppressWarnings("unchecked")
+public class TableMapReduceUtil {
+  
+  /**
+   * Use this before submitting a TableMap job. It will
+   * appropriately set up the JobConf.
+   * 
+   * @param table  The table name to read from.
+   * @param columns  The columns to scan.
+   * @param mapper  The mapper class to use.
+   * @param outputKeyClass  The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job  The current job configuration to adjust.
+   */
+  public static void initTableMapJob(String table, String columns,
+    Class<? extends TableMap> mapper, 
+    Class<? extends WritableComparable> outputKeyClass, 
+    Class<? extends Writable> outputValueClass, JobConf job) {
+      
+    job.setInputFormat(TableInputFormat.class);
+    job.setMapOutputValueClass(outputValueClass);
+    job.setMapOutputKeyClass(outputKeyClass);
+    job.setMapperClass(mapper);
+    FileInputFormat.addInputPaths(job, table);
+    job.set(TableInputFormat.COLUMN_LIST, columns);
+  }
+  
+  /**
+   * Use this before submitting a TableReduce job. It will
+   * appropriately set up the JobConf.
+   * 
+   * @param table  The output table.
+   * @param reducer  The reducer class to use.
+   * @param job  The current job configuration to adjust.
+   * @throws IOException When determining the region count fails. 
+   */
+  public static void initTableReduceJob(String table,
+    Class<? extends TableReduce> reducer, JobConf job)
+  throws IOException {
+    initTableReduceJob(table, reducer, job, null);
+  }
+
+  /**
+   * Use this before submitting a TableReduce job. It will
+   * appropriately set up the JobConf.
+   * 
+   * @param table  The output table.
+   * @param reducer  The reducer class to use.
+   * @param job  The current job configuration to adjust.
+   * @param partitioner  Partitioner to use. Pass <code>null</code> to use 
+   * default partitioner.
+   * @throws IOException When determining the region count fails. 
+   */
+  public static void initTableReduceJob(String table,
+    Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
+  throws IOException {
+    job.setOutputFormat(TableOutputFormat.class);
+    job.setReducerClass(reducer);
+    job.set(TableOutputFormat.OUTPUT_TABLE, table);
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(Put.class);
+    if (partitioner == HRegionPartitioner.class) {
+      job.setPartitionerClass(HRegionPartitioner.class);
+      HTable outputTable = new HTable(new HBaseConfiguration(job), table);
+      int regions = outputTable.getRegionsInfo().size();
+      if (job.getNumReduceTasks() > regions) {
+        job.setNumReduceTasks(outputTable.getRegionsInfo().size());
+      }
+    } else if (partitioner != null) {
+      job.setPartitionerClass(partitioner);
+    }
+  }
+  
+  /**
+   * Ensures that the given number of reduce tasks for the given job 
+   * configuration does not exceed the number of regions for the given table. 
+   * 
+   * @param table  The table to get the region count for.
+   * @param job  The current job configuration to adjust.
+   * @throws IOException When retrieving the table details fails.
+   */
+  public static void limitNumReduceTasks(String table, JobConf job) 
+  throws IOException { 
+    HTable outputTable = new HTable(new HBaseConfiguration(job), table);
+    int regions = outputTable.getRegionsInfo().size();
+    if (job.getNumReduceTasks() > regions)
+      job.setNumReduceTasks(regions);
+  }
+
+  /**
+   * Ensures that the given number of map tasks for the given job 
+   * configuration does not exceed the number of regions for the given table. 
+   * 
+   * @param table  The table to get the region count for.
+   * @param job  The current job configuration to adjust.
+   * @throws IOException When retrieving the table details fails.
+   */
+  public static void limitNumMapTasks(String table, JobConf job) 
+  throws IOException { 
+    HTable outputTable = new HTable(new HBaseConfiguration(job), table);
+    int regions = outputTable.getRegionsInfo().size();
+    if (job.getNumMapTasks() > regions)
+      job.setNumMapTasks(regions);
+  }
+
+  /**
+   * Sets the number of reduce tasks for the given job configuration to the 
+   * number of regions the given table has. 
+   * 
+   * @param table  The table to get the region count for.
+   * @param job  The current job configuration to adjust.
+   * @throws IOException When retrieving the table details fails.
+   */
+  public static void setNumReduceTasks(String table, JobConf job) 
+  throws IOException { 
+    HTable outputTable = new HTable(new HBaseConfiguration(job), table);
+    int regions = outputTable.getRegionsInfo().size();
+    job.setNumReduceTasks(regions);
+  }
+  
+  /**
+   * Sets the number of map tasks for the given job configuration to the 
+   * number of regions the given table has. 
+   * 
+   * @param table  The table to get the region count for.
+   * @param job  The current job configuration to adjust.
+   * @throws IOException When retrieving the table details fails.
+   */
+  public static void setNumMapTasks(String table, JobConf job) 
+  throws IOException { 
+    HTable outputTable = new HTable(new HBaseConfiguration(job), table);
+    int regions = outputTable.getRegionsInfo().size();
+    job.setNumMapTasks(regions);
+  }
+  
+}
\ No newline at end of file

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java?rev=785076&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java Tue Jun 16 04:33:56 2009
@@ -0,0 +1,105 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Convert Map/Reduce output and write it to an HBase table
+ */
+public class TableOutputFormat extends
+    FileOutputFormat<ImmutableBytesWritable, Put> {
+
+  /** JobConf parameter that specifies the output table */
+  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+  private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
+
+  /**
+   * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) 
+   * and write to an HBase table
+   */
+  protected static class TableRecordWriter
+    implements RecordWriter<ImmutableBytesWritable, Put> {
+    private HTable m_table;
+
+    /**
+     * Instantiate a TableRecordWriter with the HBase HClient for writing.
+     * 
+     * @param table
+     */
+    public TableRecordWriter(HTable table) {
+      m_table = table;
+    }
+
+    public void close(Reporter reporter) 
+      throws IOException {
+      m_table.flushCommits();
+    }
+
+    public void write(ImmutableBytesWritable key,
+        Put value) throws IOException {
+      m_table.put(new Put(value));
+    }
+  }
+  
+  @Override
+  @SuppressWarnings("unchecked")
+  public RecordWriter getRecordWriter(FileSystem ignored,
+      JobConf job, String name, Progressable progress) throws IOException {
+    
+    // expecting exactly one path
+    
+    String tableName = job.get(OUTPUT_TABLE);
+    HTable table = null;
+    try {
+      table = new HTable(new HBaseConfiguration(job), tableName);
+    } catch(IOException e) {
+      LOG.error(e);
+      throw e;
+    }
+    table.setAutoFlush(false);
+    return new TableRecordWriter(table);
+  }
+
+  @Override
+  public void checkOutputSpecs(FileSystem ignored, JobConf job)
+  throws FileAlreadyExistsException, InvalidJobConfException, IOException {
+    
+    String tableName = job.get(OUTPUT_TABLE);
+    if(tableName == null) {
+      throw new IOException("Must specify table name");
+    }
+  }
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableReduce.java?rev=785076&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableReduce.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableReduce.java Tue Jun 16 04:33:56 2009
@@ -0,0 +1,38 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Reducer;
+
+/**
+ * Write a table, sorting by the input key
+ *
+ * @param <K> key class
+ * @param <V> value class
+ */
+@SuppressWarnings("unchecked")
+public interface TableReduce<K extends WritableComparable, V extends Writable>
+extends Reducer<K, V, ImmutableBytesWritable, Put> {
+
+}
\ No newline at end of file

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java?rev=785076&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java Tue Jun 16 04:33:56 2009
@@ -0,0 +1,112 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+
+/**
+ * A table split corresponds to a key range [low, high)
+ */
+public class TableSplit implements InputSplit, Comparable<TableSplit> {
+  private byte [] m_tableName;
+  private byte [] m_startRow;
+  private byte [] m_endRow;
+  private String m_regionLocation;
+
+  /** default constructor */
+  public TableSplit() {
+    this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
+      HConstants.EMPTY_BYTE_ARRAY, "");
+  }
+
+  /**
+   * Constructor
+   * @param tableName
+   * @param startRow
+   * @param endRow
+   * @param location
+   */
+  public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow,
+      final String location) {
+    this.m_tableName = tableName;
+    this.m_startRow = startRow;
+    this.m_endRow = endRow;
+    this.m_regionLocation = location;
+  }
+
+  /** @return table name */
+  public byte [] getTableName() {
+    return this.m_tableName;
+  }
+
+  /** @return starting row key */
+  public byte [] getStartRow() {
+    return this.m_startRow;
+  }
+
+  /** @return end row key */
+  public byte [] getEndRow() {
+    return this.m_endRow;
+  }
+
+  /** @return the region's hostname */
+  public String getRegionLocation() {
+    return this.m_regionLocation;
+  }
+
+  public String[] getLocations() {
+    return new String[] {this.m_regionLocation};
+  }
+
+  public long getLength() {
+    // Not clear how to obtain this... seems to be used only for sorting splits
+    return 0;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    this.m_tableName = Bytes.readByteArray(in);
+    this.m_startRow = Bytes.readByteArray(in);
+    this.m_endRow = Bytes.readByteArray(in);
+    this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
+  }
+
+  public void write(DataOutput out) throws IOException {
+    Bytes.writeByteArray(out, this.m_tableName);
+    Bytes.writeByteArray(out, this.m_startRow);
+    Bytes.writeByteArray(out, this.m_endRow);
+    Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
+  }
+
+  @Override
+  public String toString() {
+    return m_regionLocation + ":" +
+      Bytes.toStringBinary(m_startRow) + "," + Bytes.toStringBinary(m_endRow);
+  }
+
+  public int compareTo(TableSplit o) {
+    return Bytes.compareTo(getStartRow(), o.getStartRow());
+  }
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java?rev=785076&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java Tue Jun 16 04:33:56 2009
@@ -0,0 +1,267 @@
+/*
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+Provides HBase <a href="http://wiki.apache.org/hadoop/HadoopMapReduce">MapReduce</a>
+Input/OutputFormats, a table indexing MapReduce job, and utility
+
+<h2>Table of Contents</h2>
+<ul>
+<li><a href="#classpath">HBase, MapReduce and the CLASSPATH</a></li>
+<li><a href="#sink">HBase as MapReduce job data source and sink</a></li>
+<li><a href="#examples">Example Code</a></li>
+</ul>
+
+<h2><a name="classpath">HBase, MapReduce and the CLASSPATH</a></h2>
+
+<p>MapReduce jobs deployed to a MapReduce cluster do not by default have access
+to the HBase configuration under <code>$HBASE_CONF_DIR</code> nor to HBase classes.
+You could add <code>hbase-site.xml</code> to $HADOOP_HOME/conf and add
+<code>hbase-X.X.X.jar</code> to the <code>$HADOOP_HOME/lib</code> and copy these
+changes across your cluster but the cleanest means of adding hbase configuration
+and classes to the cluster <code>CLASSPATH</code> is by uncommenting
+<code>HADOOP_CLASSPATH</code> in <code>$HADOOP_HOME/conf/hadoop-env.sh</code>
+and adding the path to the hbase jar and <code>$HBASE_CONF_DIR</code> directory.
+Then copy the amended configuration around the cluster.
+You'll probably need to restart the MapReduce cluster if you want it to notice
+the new configuration.
+</p>
+
+<p>For example, here is how you would amend <code>hadoop-env.sh</code> adding the
+built hbase jar, hbase conf, and the <code>PerformanceEvaluation</code> class from
+the built hbase test jar to the hadoop <code>CLASSPATH<code>:
+
+<blockquote><pre># Extra Java CLASSPATH elements. Optional.
+# export HADOOP_CLASSPATH=
+export HADOOP_CLASSPATH=$HBASE_HOME/build/test:$HBASE_HOME/build/hbase-X.X.X.jar:$HBASE_HOME/build/hbase-X.X.X-test.jar:$HBASE_HOME/conf</pre></blockquote>
+
+<p>Expand <code>$HBASE_HOME</code> in the above appropriately to suit your
+local environment.</p>
+
+<p>After copying the above change around your cluster, this is how you would run
+the PerformanceEvaluation MR job to put up 4 clients (Presumes a ready mapreduce
+cluster):
+
+<blockquote><pre>$HADOOP_HOME/bin/hadoop org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 4</pre></blockquote>
+
+The PerformanceEvaluation class wil be found on the CLASSPATH because you
+added <code>$HBASE_HOME/build/test</code> to HADOOP_CLASSPATH
+</p>
+
+<p>Another possibility, if for example you do not have access to hadoop-env.sh or
+are unable to restart the hadoop cluster, is bundling the hbase jar into a mapreduce
+job jar adding it and its dependencies under the job jar <code>lib/</code>
+directory and the hbase conf into a job jar <code>conf/</code> directory.
+</a>
+
+<h2><a name="sink">HBase as MapReduce job data source and sink</a></h2>
+
+<p>HBase can be used as a data source, {@link org.apache.hadoop.hbase.mapreduce.TableInputFormat TableInputFormat},
+and data sink, {@link org.apache.hadoop.hbase.mapreduce.TableOutputFormat TableOutputFormat}, for MapReduce jobs.
+Writing MapReduce jobs that read or write HBase, you'll probably want to subclass
+{@link org.apache.hadoop.hbase.mapreduce.TableMap TableMap} and/or
+{@link org.apache.hadoop.hbase.mapreduce.TableReduce TableReduce}.  See the do-nothing
+pass-through classes {@link org.apache.hadoop.hbase.mapreduce.IdentityTableMap IdentityTableMap} and
+{@link org.apache.hadoop.hbase.mapreduce.IdentityTableReduce IdentityTableReduce} for basic usage.  For a more
+involved example, see {@link org.apache.hadoop.hbase.mapreduce.BuildTableIndex BuildTableIndex}
+or review the <code>org.apache.hadoop.hbase.mapreduce.TestTableMapReduce</code> unit test.
+</p>
+
+<p>Running mapreduce jobs that have hbase as source or sink, you'll need to
+specify source/sink table and column names in your configuration.</p>
+
+<p>Reading from hbase, the TableInputFormat asks hbase for the list of
+regions and makes a map-per-region or <code>mapred.map.tasks maps</code>,
+whichever is smaller (If your job only has two maps, up mapred.map.tasks
+to a number > number of regions). Maps will run on the adjacent TaskTracker
+if you are running a TaskTracer and RegionServer per node.
+Writing, it may make sense to avoid the reduce step and write yourself back into
+hbase from inside your map. You'd do this when your job does not need the sort
+and collation that mapreduce does on the map emitted data; on insert,
+hbase 'sorts' so there is no point double-sorting (and shuffling data around
+your mapreduce cluster) unless you need to. If you do not need the reduce,
+you might just have your map emit counts of records processed just so the
+framework's report at the end of your job has meaning or set the number of
+reduces to zero and use TableOutputFormat. See example code
+below. If running the reduce step makes sense in your case, its usually better
+to have lots of reducers so load is spread across the hbase cluster.</p>
+
+<p>There is also a new hbase partitioner that will run as many reducers as
+currently existing regions.  The 
+{@link org.apache.hadoop.hbase.mapreduce.HRegionPartitioner} is suitable
+when your table is large and your upload is not such that it will greatly
+alter the number of existing regions when done; other use the default
+partitioner.
+</p>
+
+<h2><a name="examples">Example Code</a></h2>
+<h3>Sample Row Counter</h3>
+<p>See {@link org.apache.hadoop.hbase.mapreduce.RowCounter}.  You should be able to run
+it by doing: <code>% ./bin/hadoop jar hbase-X.X.X.jar</code>.  This will invoke
+the hbase MapReduce Driver class.  Select 'rowcounter' from the choice of jobs
+offered. You may need to add the hbase conf directory to <code>$HADOOP_HOME/conf/hadoop-env.sh#HADOOP_CLASSPATH</code>
+so the rowcounter gets pointed at the right hbase cluster (or, build a new jar
+with an appropriate hbase-site.xml built into your job jar).
+</p>
+<h3>PerformanceEvaluation</h3>
+<p>See org.apache.hadoop.hbase.PerformanceEvaluation from hbase src/test.  It runs
+a mapreduce job to run concurrent clients reading and writing hbase.
+</p>
+
+<h3>Sample MR Bulk Uploader</h3>
+<p>A students/classes example based on a contribution by Naama Kraus with logs of
+documentation can be found over in src/examples/mapred.
+Its the <code>org.apache.hadoop.hbase.mapreduce.SampleUploader</code> class.
+Just copy it under src/java/org/apache/hadoop/hbase/mapred to compile and try it
+(until we start generating an hbase examples jar).  The class reads a data file
+from HDFS and per line, does an upload to HBase using TableReduce.
+Read the class comment for specification of inputs, prerequisites, etc.
+</p>
+
+<h3>Example to bulk import/load a text file into an HTable
+</h3>
+
+<p>Here's a sample program from 
+<a href="http://www.spicylogic.com/allenday/blog/category/computing/distributed-systems/hadoop/hbase/">Allen Day</a>
+that takes an HDFS text file path and an HBase table name as inputs, and loads the contents of the text file to the table
+all up in the map phase.
+</p>
+
+<blockquote><pre>
+package com.spicylogic.hbase;
+package org.apache.hadoop.hbase.mapreduce;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Class that adds the parsed line from the input to hbase
+ * in the map function.  Map has no emissions and job
+ * has no reduce.
+ *&#x2f;
+public class BulkImport implements Tool {
+  private static final String NAME = "BulkImport";
+  private Configuration conf;
+
+  public static class InnerMap extends MapReduceBase implements Mapper&lt;LongWritable, Text, Text, Text> {
+    private HTable table;
+    private HBaseConfiguration HBconf;
+
+    public void map(LongWritable key, Text value,
+        OutputCollector&lt;Text, Text> output, Reporter reporter)
+    throws IOException {
+      if ( table == null )
+        throw new IOException("table is null");
+      
+      // Split input line on tab character
+      String [] splits = value.toString().split("\t");
+      if ( splits.length != 4 )
+        return;
+      
+      String rowID = splits[0];
+      int timestamp  = Integer.parseInt( splits[1] );
+      String colID = splits[2];
+      String cellValue = splits[3];
+
+      reporter.setStatus("Map emitting cell for row='" + rowID +
+          "', column='" + colID + "', time='" + timestamp + "'");
+
+      BatchUpdate bu = new BatchUpdate( rowID );
+      if ( timestamp > 0 )
+        bu.setTimestamp( timestamp );
+
+      bu.put(colID, cellValue.getBytes());      
+      table.commit( bu );      
+    }
+
+    public void configure(JobConf job) {
+      HBconf = new HBaseConfiguration(job);
+      try {
+        table = new HTable( HBconf, job.get("input.table") );
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
+  }
+  
+  public JobConf createSubmittableJob(String[] args) {
+    JobConf c = new JobConf(getConf(), BulkImport.class);
+    c.setJobName(NAME);
+    FileInputFormat.setInputPaths(c, new Path(args[0]));
+
+    c.set("input.table", args[1]);
+    c.setMapperClass(InnerMap.class);
+    c.setNumReduceTasks(0);
+    c.setOutputFormat(NullOutputFormat.class);
+    return c;
+  }
+  
+  static int printUsage() {
+    System.err.println("Usage: " + NAME + " &lt;input> &lt;table_name>");
+    System.err.println("\twhere &lt;input> is a tab-delimited text file with 4 columns.");
+    System.err.println("\t\tcolumn 1 = row ID");
+    System.err.println("\t\tcolumn 2 = timestamp (use a negative value for current time)");
+    System.err.println("\t\tcolumn 3 = column ID");
+    System.err.println("\t\tcolumn 4 = cell value");
+    return -1;
+  } 
+
+  public int run(@SuppressWarnings("unused") String[] args) throws Exception {
+    // Make sure there are exactly 3 parameters left.
+    if (args.length != 2) {
+      return printUsage();
+    }
+    JobClient.runJob(createSubmittableJob(args));
+    return 0;
+  }
+
+  public Configuration getConf() {
+    return this.conf;
+  } 
+
+  public void setConf(final Configuration c) {
+    this.conf = c;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int errCode = ToolRunner.run(new Configuration(), new BulkImport(), args);
+    System.exit(errCode);
+  }
+}
+</pre></blockquote>
+
+*/
+package org.apache.hadoop.hbase.mapreduce;

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/BaseScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/BaseScanner.java?rev=785076&r1=785075&r2=785076&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/BaseScanner.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/BaseScanner.java Tue Jun 16 04:33:56 2009
@@ -38,8 +38,9 @@
 import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.regionserver.HLog;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -147,16 +148,18 @@
 
     // Array to hold list of split parents found.  Scan adds to list.  After
     // scan we go check if parents can be removed.
-    Map<HRegionInfo, RowResult> splitParents =
-      new HashMap<HRegionInfo, RowResult>();
+    Map<HRegionInfo, Result> splitParents =
+      new HashMap<HRegionInfo, Result>();
     List<byte []> emptyRows = new ArrayList<byte []>();
     int rows = 0;
     try {
       regionServer = master.connection.getHRegionConnection(region.getServer());
+      
       scannerId = regionServer.openScanner(region.getRegionName(),
-        COLUMN_FAMILY_ARRAY, EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP, null);
+          new Scan().addFamily(HConstants.CATALOG_FAMILY));
       while (true) {
-        RowResult values = regionServer.next(scannerId);
+        Result values = regionServer.next(scannerId);
+        
         if (values == null || values.size() == 0) {
           break;
         }
@@ -165,8 +168,16 @@
           emptyRows.add(values.getRow());
           continue;
         }
-        String serverName = Writables.cellToString(values.get(COL_SERVER));
-        long startCode = Writables.cellToLong(values.get(COL_STARTCODE));
+        String serverName = "";
+        byte [] val = values.getValue(CATALOG_FAMILY, SERVER_QUALIFIER);
+        if( val != null) {
+          serverName = Bytes.toString(val);
+        }
+        long startCode = 0L;
+        val = values.getValue(CATALOG_FAMILY, STARTCODE_QUALIFIER);
+        if(val != null) {
+          startCode = Bytes.toLong(val);
+        }
 
         // Note Region has been assigned.
         checkAssigned(info, serverName, startCode);
@@ -213,7 +224,7 @@
     // Take a look at split parents to see if any we can clean up.
     
     if (splitParents.size() > 0) {
-      for (Map.Entry<HRegionInfo, RowResult> e : splitParents.entrySet()) {
+      for (Map.Entry<HRegionInfo, Result> e : splitParents.entrySet()) {
         HRegionInfo hri = e.getKey();
         cleanupSplits(region.getRegionName(), regionServer, hri, e.getValue());
       }
@@ -250,13 +261,13 @@
    */
   private boolean cleanupSplits(final byte [] metaRegionName, 
     final HRegionInterface srvr, final HRegionInfo parent,
-    RowResult rowContent)
+    Result rowContent)
   throws IOException {
     boolean result = false;
     boolean hasReferencesA = hasReferences(metaRegionName, srvr,
-        parent.getRegionName(), rowContent, COL_SPLITA);
+        parent.getRegionName(), rowContent, CATALOG_FAMILY, SPLITA_QUALIFIER);
     boolean hasReferencesB = hasReferences(metaRegionName, srvr,
-        parent.getRegionName(), rowContent, COL_SPLITB);
+        parent.getRegionName(), rowContent, CATALOG_FAMILY, SPLITB_QUALIFIER);
     if (!hasReferencesA && !hasReferencesB) {
       LOG.info("Deleting region " + parent.getRegionNameAsString() +
         " (encoded=" + parent.getEncodedName() +
@@ -283,15 +294,16 @@
    */
   private boolean hasReferences(final byte [] metaRegionName, 
     final HRegionInterface srvr, final byte [] parent,
-    RowResult rowContent, final byte [] splitColumn)
+    Result rowContent, final byte [] splitFamily, byte [] splitQualifier)
   throws IOException {
     boolean result = false;
     HRegionInfo split =
-      Writables.getHRegionInfo(rowContent.get(splitColumn));
+      Writables.getHRegionInfoOrNull(rowContent.getValue(splitFamily, splitQualifier));
     if (split == null) {
       return result;
     }
-    Path tabledir = new Path(this.master.rootdir, split.getTableDesc().getNameAsString());
+    Path tabledir =
+      new Path(this.master.rootdir, split.getTableDesc().getNameAsString());
     for (HColumnDescriptor family: split.getTableDesc().getFamilies()) {
       Path p = Store.getStoreHomedir(tabledir, split.getEncodedName(),
         family.getName());
@@ -316,14 +328,15 @@
     }
     
     if (LOG.isDebugEnabled()) {
-      LOG.debug(split.getRegionNameAsString() +
-        " no longer has references to " + Bytes.toString(parent));
+      LOG.debug(split.getRegionNameAsString() + "/" + split.getEncodedName()
+          + " no longer has references to " + Bytes.toStringBinary(parent)
+         );
     }
     
-    BatchUpdate b = new BatchUpdate(parent);
-    b.delete(splitColumn);
-    srvr.batchUpdate(metaRegionName, b, -1L);
-      
+    Delete delete = new Delete(parent);
+    delete.deleteColumns(splitFamily, splitQualifier);
+    srvr.delete(metaRegionName, delete);
+    
     return result;
   }
 

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java?rev=785076&r1=785075&r2=785076&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java Tue Jun 16 04:33:56 2009
@@ -27,6 +27,8 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.util.Writables;
@@ -78,11 +80,13 @@
       }
 
       // Update meta table
-      BatchUpdate b = new BatchUpdate(i.getRegionName());
-      updateRegionInfo(b, i);
-      b.delete(COL_SERVER);
-      b.delete(COL_STARTCODE);
-      server.batchUpdate(m.getRegionName(), b, -1L);
+      Put put = updateRegionInfo(i);
+      server.put(m.getRegionName(), put);
+      
+      Delete delete = new Delete(i.getRegionName());
+      delete.deleteColumns(CATALOG_FAMILY, SERVER_QUALIFIER);
+      delete.deleteColumns(CATALOG_FAMILY, STARTCODE_QUALIFIER);
+      server.delete(m.getRegionName(), delete);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Updated columns in row: " + i.getRegionNameAsString());
       }
@@ -125,9 +129,11 @@
     servedRegions.clear();
   }
 
-  protected void updateRegionInfo(final BatchUpdate b, final HRegionInfo i)
+  protected Put updateRegionInfo(final HRegionInfo i)
   throws IOException {
     i.setOffline(!online);
-    b.put(COL_REGIONINFO, Writables.getBytes(i));
+    Put put = new Put(i.getRegionName());
+    put.add(CATALOG_FAMILY, REGIONINFO_QUALIFIER, Writables.getBytes(i));
+    return put;
   }
 }



Mime
View raw message