hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r818188 - in /hadoop/hbase/branches/0.20: ./ bin/ src/java/org/apache/hadoop/hbase/mapreduce/ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/mapreduce/
Date Wed, 23 Sep 2009 18:39:34 GMT
Author: stack
Date: Wed Sep 23 18:39:33 2009
New Revision: 818188

URL: http://svn.apache.org/viewvc?rev=818188&view=rev
Log:
HBASE-48 [hbase] Bulk load tools

Added:
    hadoop/hbase/branches/0.20/bin/loadtable.rb
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
Modified:
    hadoop/hbase/branches/0.20/CHANGES.txt
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java

Modified: hadoop/hbase/branches/0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/CHANGES.txt?rev=818188&r1=818187&r2=818188&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.20/CHANGES.txt Wed Sep 23 18:39:33 2009
@@ -43,6 +43,7 @@
    HBASE-1684  Backup (Export/Import) contrib tool for 0.20
    HBASE-1854  Remove the Region Historian
    HBASE-1860  Change HTablePool#createHTable from private to protected
+   HBASE-48    Bulk load tools
 
 Release 0.20.0 - Tue Sep  8 12:48:41 PDT 2009
 

Added: hadoop/hbase/branches/0.20/bin/loadtable.rb
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/bin/loadtable.rb?rev=818188&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/bin/loadtable.rb (added)
+++ hadoop/hbase/branches/0.20/bin/loadtable.rb Wed Sep 23 18:39:33 2009
@@ -0,0 +1,132 @@
+# Script that takes over from org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.
+# Pass it output directory of HFileOutputFormat. It will read the passed files,
+# move them into place and update the catalog table appropriately.  Warning:
+# it will overwrite anything that exists already for passed table.
+# It expects hbase to be up and running so it can insert table info.
+#
+# To see usage for this script, run: 
+#
+#  ${HBASE_HOME}/bin/hbase org.jruby.Main loadtable.rb
+#
+include Java
+import java.util.TreeMap
+import org.apache.hadoop.hbase.client.HTable
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.hbase.util.FSUtils
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.util.Writables
+import org.apache.hadoop.hbase.HConstants
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.HRegionInfo
+import org.apache.hadoop.hbase.HTableDescriptor
+import org.apache.hadoop.hbase.HColumnDescriptor
+import org.apache.hadoop.hbase.HRegionInfo
+import org.apache.hadoop.hbase.io.hfile.HFile
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.mapred.OutputLogFilter
+import org.apache.commons.logging.Log
+import org.apache.commons.logging.LogFactory
+
+# Name of this script
+NAME = "loadtable"
+
+# Print usage for this script
+def usage
+  puts 'Usage: %s.rb TABLENAME HFILEOUTPUTFORMAT_OUTPUT_DIR' % NAME
+  exit!
+end
+
+# Passed 'dir' exists and is a directory else exception
+def isDirExists(fs, dir)
+  raise IOError.new("Does not exit: " + dir.toString()) unless fs.exists(dir)
+  raise IOError.new("Not a directory: " + dir.toString()) unless fs.isDirectory(dir)
+end
+
+# Check arguments
+if ARGV.size != 2
+  usage
+end
+
+# Check good table names were passed.
+tableName = HTableDescriptor.isLegalTableName(ARGV[0].to_java_bytes)
+outputdir = Path.new(ARGV[1])
+
+# Get configuration to use.
+c = HBaseConfiguration.new()
+# Get a logger and a metautils instance.
+LOG = LogFactory.getLog(NAME)
+
+# Set hadoop filesystem configuration using the hbase.rootdir.
+# Otherwise, we'll always use localhost though the hbase.rootdir
+# might be pointing at hdfs location.
+c.set("fs.default.name", c.get(HConstants::HBASE_DIR))
+fs = FileSystem.get(c)
+
+# If hfiles directory does not exist, exit.
+isDirExists(fs, outputdir)
+# Create table dir if it doesn't exist.
+rootdir = FSUtils.getRootDir(c)
+tableDir = Path.new(rootdir, Path.new(Bytes.toString(tableName)))
+fs.mkdirs(tableDir) unless fs.exists(tableDir)
+
+# Start. Per hfile, move it, and insert an entry in catalog table.
+families = fs.listStatus(outputdir, OutputLogFilter.new())
+throw IOError.new("Can do one family only") if families.length > 1
+# Read meta on all files. Put in map keyed by end key.
+map = TreeMap.new(Bytes::ByteArrayComparator.new())
+family = families[0]
+# Make sure this subdir exists under table
+hfiles = fs.listStatus(family.getPath())
+LOG.info("Found " + hfiles.length.to_s + " hfiles");
+count = 0
+for hfile in hfiles
+  reader = HFile::Reader.new(fs, hfile.getPath(), nil, false)
+  begin
+    fileinfo = reader.loadFileInfo() 
+    lastkey = reader.getLastKey()
+    # Last key is row/column/ts.  We just want the row part.
+    rowlen = Bytes.toShort(lastkey)
+    LOG.info(count.to_s + " read lastrow of " +
+      Bytes.toString(lastkey[2, rowlen]) + " from " + hfile.getPath().toString())
+    map.put(lastkey[2, rowlen], [hfile, fileinfo])
+    count = count + 1
+  ensure
+    reader.close()
+  end
+end
+# Now I have sorted list of fileinfo+paths.  Start insert.
+# Get a client on catalog table.
+meta = HTable.new(c, HConstants::META_TABLE_NAME)
+# I can't find out from hfile how its compressed.
+# Using all defaults. Change manually after loading if
+# something else wanted in column or table attributes.
+familyName = family.getPath().getName()
+hcd = HColumnDescriptor.new(familyName)
+htd = HTableDescriptor.new(tableName)
+htd.addFamily(hcd)
+previouslastkey = HConstants::EMPTY_START_ROW
+count = 0
+for i in map.keySet()
+  tuple = map.get(i)
+  startkey = previouslastkey
+  count = 1 + count
+  lastkey = i
+  if count == map.size()
+    # Then we are at last key. Set it to special indicator
+    lastkey = HConstants::EMPTY_START_ROW
+  end
+  previouslastkey = lastkey
+  hri = HRegionInfo.new(htd, startkey, lastkey)  
+  LOG.info(hri.toString())
+  hfile = tuple[0].getPath()
+  rdir = Path.new(Path.new(tableDir, hri.getEncodedName().to_s), familyName)
+  fs.mkdirs(rdir)
+  tgt = Path.new(rdir, hfile.getName())
+  fs.rename(hfile, tgt)
+  LOG.info("Moved " + hfile.toString() + " to " + tgt.toString())
+  p = Put.new(hri.getRegionName())
+  p.add(HConstants::CATALOG_FAMILY, HConstants::REGIONINFO_QUALIFIER, Writables.getBytes(hri))
+  meta.put(p)
+  LOG.info("Inserted " + hri.toString())
+end

Added: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=818188&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
(added)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
Wed Sep 23 18:39:33 2009
@@ -0,0 +1,134 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.mortbay.log.Log;
+
+/**
+ * Writes HFiles. Passed KeyValues must arrive in order.
+ * Currently, can only write files to a single column family at a
+ * time.  Multiple column families requires coordinating keys cross family.
+ * Writes current time as the sequence id for the file. Sets the major compacted
+ * attribute on created hfiles.
+ * @see KeyValueSortReducer
+ */
+public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue>
{
+  public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(TaskAttemptContext
context)
+  throws IOException, InterruptedException {
+    // Get the path of the temporary output file 
+    final Path outputdir = FileOutputFormat.getOutputPath(context);
+    Configuration conf = context.getConfiguration();
+    final FileSystem fs = outputdir.getFileSystem(conf);
+    // These configs. are from hbase-*.xml
+    final long maxsize = conf.getLong("hbase.hregion.max.filesize", 268435456);
+    final int blocksize = conf.getInt("hfile.min.blocksize.size", 65536);
+    // Invented config.  Add to hbase-*.xml if other than default compression.
+    final String compression = conf.get("hfile.compression",
+      Compression.Algorithm.NONE.getName());
+
+    return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
+      // Map of families to writers and how much has been output on the writer.
+      private final Map<byte [], WriterLength> writers =
+        new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
+      private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
+
+      public void write(ImmutableBytesWritable row, KeyValue kv)
+      throws IOException {
+        long length = kv.getLength();
+        byte [] family = kv.getFamily();
+        WriterLength wl = this.writers.get(family);
+        if (wl == null || ((length + wl.written) >= maxsize) &&
+            Bytes.compareTo(this.previousRow, 0, this.previousRow.length,
+              kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) {
+          // Get a new writer.
+          Path basedir = new Path(outputdir, Bytes.toString(family));
+          if (wl == null) {
+            wl = new WriterLength();
+            this.writers.put(family, wl);
+            if (this.writers.size() > 1) throw new IOException("One family only");
+            // If wl == null, first file in family.  Ensure family dir exits.
+            if (!fs.exists(basedir)) fs.mkdirs(basedir);
+          }
+          wl.writer = getNewWriter(wl.writer, basedir);
+          Log.info("Writer=" + wl.writer.getPath() +
+            ((wl.written == 0)? "": ", wrote=" + wl.written));
+          wl.written = 0;
+        }
+        wl.writer.append(kv);
+        wl.written += length;
+        // Copy the row so we know when a row transition.
+        this.previousRow = kv.getRow();
+      }
+
+      /* Create a new HFile.Writer. Close current if there is one.
+       * @param writer
+       * @param familydir
+       * @return A new HFile.Writer.
+       * @throws IOException
+       */
+      private HFile.Writer getNewWriter(final HFile.Writer writer,
+          final Path familydir)
+      throws IOException {
+        close(writer);
+        return new HFile.Writer(fs,  StoreFile.getUniqueFile(fs, familydir),
+          blocksize, compression, KeyValue.KEY_COMPARATOR);
+      }
+
+      private void close(final HFile.Writer w) throws IOException {
+        if (w != null) {
+          StoreFile.appendMetadata(w, System.currentTimeMillis(), true);
+          w.close();
+        }
+      }
+
+      public void close(TaskAttemptContext c)
+      throws IOException, InterruptedException {
+        for (Map.Entry<byte [], WriterLength> e: this.writers.entrySet()) {
+          close(e.getValue().writer);
+        }
+      }
+    };
+  }
+
+  /*
+   * Data structure to hold a Writer and amount of data written on it. 
+   */
+  static class WriterLength {
+    long written = 0;
+    HFile.Writer writer = null;
+  }
+}

Added: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java?rev=818188&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
(added)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
Wed Sep 23 18:39:33 2009
@@ -0,0 +1,50 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * Emits sorted KeyValues.
+ * Reads in all KeyValues from passed Iterator, sorts them, then emits
+ * KeyValues in sorted order.  If lots of columns per row, it will use lots of
+ * memory sorting.
+ * @see HFileOutputFormat
+ */
+public class KeyValueSortReducer extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable,
KeyValue> {
+  protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<KeyValue> kvs,
+      org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable,
KeyValue>.Context context)
+  throws java.io.IOException, InterruptedException {
+    TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+    for (KeyValue kv: kvs) {
+      map.add(kv);
+    }
+    context.setStatus("Read " + map.getClass());
+    int index = 0;
+    for (KeyValue kv: map) {
+      context.write(row, kv);
+      if (index > 0 && index % 100 == 0) context.setStatus("Wrote " + index);
+    }
+  }
+}

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java?rev=818188&r1=818187&r2=818188&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java
(original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/mapreduce/package-info.java
Wed Sep 23 18:39:33 2009
@@ -25,6 +25,7 @@
 <ul>
 <li><a href="#classpath">HBase, MapReduce and the CLASSPATH</a></li>
 <li><a href="#sink">HBase as MapReduce job data source and sink</a></li>
+<li><a href="#bulk">Bulk Import writing HFiles directly</a></li>
 <li><a href="#examples">Example Code</a></li>
 </ul>
 
@@ -82,7 +83,7 @@
 <p>Reading from hbase, the TableInputFormat asks hbase for the list of
 regions and makes a map-per-region or <code>mapred.map.tasks maps</code>,
 whichever is smaller (If your job only has two maps, up mapred.map.tasks
-to a number > number of regions). Maps will run on the adjacent TaskTracker
+to a number &gt; number of regions). Maps will run on the adjacent TaskTracker
 if you are running a TaskTracer and RegionServer per node.
 Writing, it may make sense to avoid the reduce step and write yourself back into
 hbase from inside your map. You'd do this when your job does not need the sort
@@ -103,6 +104,48 @@
 partitioner.
 </p>
 
+<h2><a name="bulk">Bulk import writing HFiles directly</a></h2>
+<p>If importing into a new table, its possible to by-pass the HBase API
+and write your content directly to the filesystem properly formatted as
+HBase data files (HFiles).  Your import will run faster, perhaps as much
+as an order of magnitude faster if not more.
+</p>
+<p>You will need to write a MapReduce job.  The map task will know how to
+pull from your data source.  Your reduce task will need to be hooked up to
+{@link org.apache.hadoop.hbase.mapreduce.HFileOutputFormat}.  It expects to receive a row
id and a value.
+The row id must be formatted as a {@link org.apache.hadoop.hbase.io.ImmutableBytesWritable}
and the
+value as a {@link org.apache.hadoop.hbase.KeyValue} (A KeyValue holds he value for a cell
and
+its coordinates; row/family/qualifier/timestamp, etc.).  Your reduce task
+will also need to emit the KeyValues in order.  See {@link org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer}
+for an example reducer that emits KeyValues in order.
+</p>
+<p>Most importantly, you will also need to ensure that your MapReduce job
+ensures a total ordering among all keys.  MapReduce by default distributes
+keys among reducers using a Partitioner that hashes on the map task output
+key: i.e. the reducer a key ends up in is by default determined as follows
+<code> (key.hashCode() &amp; Integer.MAX_VALUE) % numReduceTasks</code>.
+Keys are sorted by the MapReduce framework before they are passed to the reducer
+BUT the sort is scoped to the particular reducer.  Its not a global sort.
+Given the default hash Partitioner, if the keys were 0-4 (inclusive), and you
+had configured two reducers, reducer 0 would have get keys 0, 2 and 4 whereas
+reducer 1 would get keys 1 and 3 (in order).  For your bulk import to work,
+the keys need to be orderd so reducer 0 gets keys 0-2 and reducer 1 gets keys
+3-4 (See TotalOrderPartitioner up in hadoop for more on what this means). 
+To achieve total ordering, you will likely need to write a Partitioner
+that is intimate with your tables key namespace and that knows how
+to distribute keys among the reducers so a total order is maintained.
+</p>
+<p>See org.apache.hadoop.hbase.mapreduce.TestHFileOutputFormat for an example that
puts together
+{@link org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer} and {@link org.apache.hadoop.hbase.mapreduce.HFileOutputFormat}.</p>
+
+<p>HFileOutputFormat writes HFiles.  When your MapReduce file finishes, in your
+output directory you will have many HFiles.  Run the script <code>bin/loadtable.rb</code>
+to move the files from the MapReduce output directory under hbase.  See head of script
+for how to run it.  This script
+also adds the new table data to the hbase catalog tables.  When the script completes,
+on the next run of the hbase metascanner -- it usually runs every minute -- your
+new table should be visible and populated.</p>
+
 <h2><a name="examples">Example Code</a></h2>
 <h3>Sample Row Counter</h3>
 <p>See {@link org.apache.hadoop.hbase.mapreduce.RowCounter}.  This job uses

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=818188&r1=818187&r2=818188&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
(original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
Wed Sep 23 18:39:33 2009
@@ -397,15 +397,16 @@
 
   /**
    * @param fs
-   * @param p
+   * @param dir Directory to create file in.
    * @return random filename inside passed <code>dir</code>
    */
-  static Path getUniqueFile(final FileSystem fs, final Path p)
+  public static Path getUniqueFile(final FileSystem fs, final Path dir)
   throws IOException {
-    if (!fs.getFileStatus(p).isDir()) {
-      throw new IOException("Expecting a directory");
+    if (!fs.getFileStatus(dir).isDir()) {
+      throw new IOException("Expecting " + dir.toString() +
+        " to be a directory");
     }
-    return fs.getFileStatus(p).isDir()? getRandomFilename(fs, p): p;
+    return fs.getFileStatus(dir).isDir()? getRandomFilename(fs, dir): dir;
   }
 
   /**

Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=818188&r1=818187&r2=818188&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
(original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
Wed Sep 23 18:39:33 2009
@@ -94,8 +94,8 @@
   private static final int ONE_GB = 1024 * 1024 * 1000;
   private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH;
   
-  static final byte [] FAMILY_NAME = Bytes.toBytes("info");
-  static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
+  public static final byte [] FAMILY_NAME = Bytes.toBytes("info");
+  public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
   
   protected static final HTableDescriptor TABLE_DESCRIPTOR;
   static {
@@ -781,7 +781,7 @@
    * consumes about 30% of CPU time.
    * @return Generated random value to insert into a table cell.
    */
-  static byte[] generateValue(final Random r) {
+  public static byte[] generateValue(final Random r) {
     byte [] b = new byte [ROW_LENGTH];
     r.nextBytes(b);
     return b;

Added: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java?rev=818188&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
(added)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
Wed Sep 23 18:39:33 2009
@@ -0,0 +1,182 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.PerformanceEvaluation;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat}.
+ * Sets up and runs a mapreduce job that writes hfile output.
+ * Creates a few inner classes to implement splits and an inputformat that
+ * emits keys and values like those of {@link PerformanceEvaluation}.  Makes
+ * as many splits as "mapred.map.tasks" maps.
+ */
+public class TestHFileOutputFormat extends HBaseTestCase {
+  private final static int ROWSPERSPLIT = 1024;
+
+  /*
+   * InputFormat that makes keys and values like those used in
+   * PerformanceEvaluation.  Makes as many splits as there are configured
+   * maps ("mapred.map.tasks").
+   */
+  static class PEInputFormat extends InputFormat<ImmutableBytesWritable, ImmutableBytesWritable>
{
+    /* Split that holds nothing but split index.
+     */
+    static class PEInputSplit extends InputSplit implements Writable {
+      private int index = -1;
+   
+      PEInputSplit() {
+        super();
+      }
+
+      PEInputSplit(final int i) {
+        this.index = i;
+      }
+
+      int getIndex() {
+        return this.index;
+      }
+
+      public long getLength() throws IOException, InterruptedException {
+        return ROWSPERSPLIT;
+      }
+
+      public String [] getLocations() throws IOException, InterruptedException {
+        return new String [] {};
+      }
+
+      public void readFields(DataInput in) throws IOException {
+        this.index = in.readInt();
+      }
+
+      public void write(DataOutput out) throws IOException {
+        out.writeInt(this.index);
+      }
+    }
+
+    public RecordReader<ImmutableBytesWritable, ImmutableBytesWritable> createRecordReader(
+        InputSplit split, TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      final int startrow = ((PEInputSplit)split).getIndex() * ROWSPERSPLIT;
+      return new RecordReader<ImmutableBytesWritable, ImmutableBytesWritable>() {
+        // Starts at a particular row 
+        private int counter = startrow;
+        private ImmutableBytesWritable key;
+        private ImmutableBytesWritable value;
+        private final Random random = new Random(System.currentTimeMillis());
+        
+        public void close() throws IOException {
+          // Nothing to do.
+        }
+
+        public ImmutableBytesWritable getCurrentKey()
+        throws IOException, InterruptedException {
+          return this.key;
+        }
+
+        public ImmutableBytesWritable getCurrentValue()
+        throws IOException, InterruptedException {
+          return this.value;
+        }
+
+        public float getProgress() throws IOException, InterruptedException {
+          return (ROWSPERSPLIT - this.counter) / this.counter;
+        }
+
+        public void initialize(InputSplit arg0, TaskAttemptContext arg1)
+            throws IOException, InterruptedException {
+          // Nothing to do.
+          
+        }
+
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+          if (this.counter - startrow > ROWSPERSPLIT) return false;
+          this.counter++;
+          this.key = new ImmutableBytesWritable(PerformanceEvaluation.format(this.counter));
+          this.value = new ImmutableBytesWritable(PerformanceEvaluation.generateValue(this.random));
+          return true;
+        }
+      };
+    }
+
+    public List<InputSplit> getSplits(JobContext context)
+    throws IOException, InterruptedException {
+      int count = context.getConfiguration().getInt("mapred.map.tasks", 1);
+      List<InputSplit> splits = new ArrayList<InputSplit>(count);
+      for (int i = 0; i < count; i++) {
+        splits.add(new PEInputSplit(i));
+      }
+      return splits;
+    }
+  }
+
+  /**
+   * Simple mapper that makes KeyValue output.
+   */
+  static class PEtoKVMapper extends Mapper<ImmutableBytesWritable, ImmutableBytesWritable,
ImmutableBytesWritable, KeyValue> {
+    protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value,
+      org.apache.hadoop.mapreduce.Mapper<ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,KeyValue>.Context
context)
+    throws java.io.IOException ,InterruptedException {
+      context.write(key, new KeyValue(key.get(), PerformanceEvaluation.FAMILY_NAME,
+        PerformanceEvaluation.QUALIFIER_NAME, value.get()));
+    }
+  }
+
+  /**
+   * Run small MR job.
+   */
+  public void testWritingPEData() throws Exception {
+    // Set down this value or we OOME in eclipse.
+    this.conf.setInt("io.sort.mb", 20);
+    // Write a few files.
+    this.conf.setLong("hbase.hregion.max.filesize", 64 * 1024);
+    Job job = new Job(this.conf, getName());
+    job.setInputFormatClass(TestHFileOutputFormat.PEInputFormat.class);
+    job.setMapperClass(TestHFileOutputFormat.PEtoKVMapper.class);
+    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+    job.setMapOutputValueClass(KeyValue.class);
+    job.setReducerClass(KeyValueSortReducer.class);
+    job.setOutputFormatClass(HFileOutputFormat.class);
+    FileOutputFormat.setOutputPath(job, this.testDir);
+    assertTrue(job.waitForCompletion(false));
+    FileStatus [] files = this.fs.listStatus(this.testDir);
+    assertTrue(files.length > 0);
+  }
+}



Mime
View raw message