hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nspiegelb...@apache.org
Subject svn commit: r1043318 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
Date Wed, 08 Dec 2010 06:37:06 GMT
Author: nspiegelberg
Date: Wed Dec  8 06:37:06 2010
New Revision: 1043318

URL: http://svn.apache.org/viewvc?rev=1043318&view=rev
Log:
HBASE-1861 : Multi-Family support for bulk upload tools

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1043318&r1=1043317&r2=1043318&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Wed
Dec  8 06:37:06 2010
@@ -58,7 +58,8 @@ import org.apache.commons.logging.LogFac
  * Currently, can only write files to a single column family at a
  * time.  Multiple column families requires coordinating keys cross family.
  * Writes current time as the sequence id for the file. Sets the major compacted
- * attribute on created hfiles.
+ * attribute on created hfiles. Calling write(null,null) will forceably roll
+ * all HFiles being written.
  * @see KeyValueSortReducer
  */
 public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue>
{
@@ -72,9 +73,10 @@ public class HFileOutputFormat extends F
     Configuration conf = context.getConfiguration();
     final FileSystem fs = outputdir.getFileSystem(conf);
     // These configs. are from hbase-*.xml
-    final long maxsize = conf.getLong("hbase.hregion.max.filesize", 268435456);
-    final int blocksize =
-      conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize", 65536);
+    final long maxsize = conf.getLong("hbase.hregion.max.filesize",
+        HConstants.DEFAULT_MAX_FILE_SIZE);
+    final int blocksize = conf.getInt("hfile.min.blocksize.size",
+        HFile.DEFAULT_BLOCKSIZE);
     // Invented config.  Add to hbase-*.xml if other than default compression.
     final String compression = conf.get("hfile.compression",
       Compression.Algorithm.NONE.getName());
@@ -85,48 +87,77 @@ public class HFileOutputFormat extends F
         new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
       private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
       private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
+      private boolean rollRequested = false;
 
       public void write(ImmutableBytesWritable row, KeyValue kv)
       throws IOException {
+        // null input == user explicitly wants to flush
+        if (row == null && kv == null) {
+          rollWriters();
+          return;
+        }
+
+        byte [] rowKey = kv.getRow();
         long length = kv.getLength();
         byte [] family = kv.getFamily();
         WriterLength wl = this.writers.get(family);
-        if (wl == null || ((length + wl.written) >= maxsize) &&
-            Bytes.compareTo(this.previousRow, 0, this.previousRow.length,
-              kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) {
-          // Get a new writer.
-          Path basedir = new Path(outputdir, Bytes.toString(family));
-          if (wl == null) {
-            wl = new WriterLength();
-            this.writers.put(family, wl);
-            if (this.writers.size() > 1) throw new IOException("One family only");
-            // If wl == null, first file in family.  Ensure family dir exits.
-            if (!fs.exists(basedir)) fs.mkdirs(basedir);
-          }
-          wl.writer = getNewWriter(wl.writer, basedir);
-          LOG.info("Writer=" + wl.writer.getPath() +
-            ((wl.written == 0)? "": ", wrote=" + wl.written));
-          wl.written = 0;
+
+        // If this is a new column family, verify that the directory exists
+        if (wl == null) {
+          fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
         }
+
+        // If any of the HFiles for the column families has reached
+        // maxsize, we need to roll all the writers
+        if (wl != null && wl.written + length >= maxsize) {
+          this.rollRequested = true;
+        }
+
+        // This can only happen once a row is finished though
+        if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
+          rollWriters();
+        }
+
+        // create a new HLog writer, if necessary
+        if (wl == null || wl.writer == null) {
+          wl = getNewWriter(family);
+        }
+
+        // we now have the proper HLog writer. full steam ahead
         kv.updateLatestStamp(this.now);
         wl.writer.append(kv);
         wl.written += length;
+
         // Copy the row so we know when a row transition.
-        this.previousRow = kv.getRow();
+        this.previousRow = rowKey;
       }
 
-      /* Create a new HFile.Writer. Close current if there is one.
-       * @param writer
-       * @param familydir
-       * @return A new HFile.Writer.
+      private void rollWriters() throws IOException {
+        for (WriterLength wl : this.writers.values()) {
+          if (wl.writer != null) {
+            LOG.info("Writer=" + wl.writer.getPath() +
+                ((wl.written == 0)? "": ", wrote=" + wl.written));
+            close(wl.writer);
+          }
+          wl.writer = null;
+          wl.written = 0;
+        }
+        this.rollRequested = false;
+      }
+
+      /* Create a new HFile.Writer.
+       * @param family
+       * @return A WriterLength, containing a new HFile.Writer.
        * @throws IOException
        */
-      private HFile.Writer getNewWriter(final HFile.Writer writer,
-          final Path familydir)
-      throws IOException {
-        close(writer);
-        return new HFile.Writer(fs,  StoreFile.getUniqueFile(fs, familydir),
-          blocksize, compression, KeyValue.KEY_COMPARATOR);
+      private WriterLength getNewWriter(byte[] family) throws IOException {
+        WriterLength wl = new WriterLength();
+        Path familydir = new Path(outputdir, Bytes.toString(family));
+        wl.writer = new HFile.Writer(fs,
+          StoreFile.getUniqueFile(fs, familydir), blocksize,
+          compression, KeyValue.KEY_COMPARATOR);
+        this.writers.put(family, wl);
+        return wl;
       }
 
       private void close(final HFile.Writer w) throws IOException {
@@ -143,8 +174,8 @@ public class HFileOutputFormat extends F
 
       public void close(TaskAttemptContext c)
       throws IOException, InterruptedException {
-        for (Map.Entry<byte [], WriterLength> e: this.writers.entrySet()) {
-          close(e.getValue().writer);
+        for (WriterLength wl: this.writers.values()) {
+          close(wl.writer);
         }
       }
     };

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java?rev=1043318&r1=1043317&r2=1043318&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java Wed Dec
 8 06:37:06 2010
@@ -19,6 +19,7 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.TreeSet;
 
@@ -26,6 +27,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * Emits sorted Puts.
@@ -46,21 +48,37 @@ public class PutSortReducer extends
               ImmutableBytesWritable, KeyValue>.Context context)
       throws java.io.IOException, InterruptedException
   {
-    TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
-  
-    for (Put p : puts) {
-      for (List<KeyValue> kvs : p.getFamilyMap().values()) {
-        for (KeyValue kv : kvs) {
-          map.add(kv.clone());
+    // although reduce() is called per-row, handle pathological case
+    long threshold = context.getConfiguration().getLong(
+        "putsortreducer.row.threshold", 2L * (1<<30));
+    Iterator<Put> iter = puts.iterator();
+    while (iter.hasNext()) {
+      TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+      long curSize = 0;
+      // stop at the end or the RAM threshold
+      while (iter.hasNext() && curSize < threshold) {
+        Put p = iter.next();
+        for (List<KeyValue> kvs : p.getFamilyMap().values()) {
+          for (KeyValue kv : kvs) {
+            map.add(kv);
+            curSize += kv.getValueLength();
+          }
         }
       }
-    }
-    context.setStatus("Read " + map.getClass());
-    int index = 0;
-    for (KeyValue kv : map) {
-      context.write(row, kv);
-      if (index > 0 && index % 100 == 0)
-        context.setStatus("Wrote " + index);
+      context.setStatus("Read " + map.size() + " entries of " + map.getClass()
+          + "(" + StringUtils.humanReadableInt(curSize) + ")");
+      int index = 0;
+      for (KeyValue kv : map) {
+        context.write(row, kv);
+        if (index > 0 && index % 100 == 0)
+          context.setStatus("Wrote " + index);
+      }
+
+      // if we have more entries to process
+      if (iter.hasNext()) {
+        // force flush because we cannot guarantee intra-row sorted order
+        context.write(null, null);
+      }
     }
   }
 }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java?rev=1043318&r1=1043317&r2=1043318&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
Wed Dec  8 06:37:06 2010
@@ -41,6 +41,9 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PerformanceEvaluation;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
@@ -65,7 +68,9 @@ import org.mockito.Mockito;
 public class TestHFileOutputFormat  {
   private final static int ROWSPERSPLIT = 1024;
 
-  private static final byte[] FAMILY_NAME = PerformanceEvaluation.FAMILY_NAME;
+  private static final byte[][] FAMILIES
+    = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
+      , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
   private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable");
   
   private HBaseTestingUtility util = new HBaseTestingUtility();
@@ -119,9 +124,11 @@ public class TestHFileOutputFormat  {
         random.nextBytes(valBytes);
         ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
 
-        KeyValue kv = new KeyValue(keyBytes, PerformanceEvaluation.FAMILY_NAME,
-            PerformanceEvaluation.QUALIFIER_NAME, valBytes);
-        context.write(key, kv);
+        for (byte[] family : TestHFileOutputFormat.FAMILIES) {
+          KeyValue kv = new KeyValue(keyBytes, family,
+              PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+          context.write(key, kv);
+        }
       }
     }
   }
@@ -268,14 +275,12 @@ public class TestHFileOutputFormat  {
     try {
       util.startMiniCluster();
       HBaseAdmin admin = new HBaseAdmin(conf);
-      HTable table = util.createTable(TABLE_NAME, FAMILY_NAME);
-      int numRegions = util.createMultiRegions(
-          util.getConfiguration(), table, FAMILY_NAME,
-          startKeys);
-      assertEquals("Should make 5 regions",
-          numRegions, 5);
+      HTable table = util.createTable(TABLE_NAME, FAMILIES);
       assertEquals("Should start with empty table",
           0, util.countRows(table));
+      int numRegions = util.createMultiRegions(
+          util.getConfiguration(), table, FAMILIES[0], startKeys);
+      assertEquals("Should make 5 regions", numRegions, 5);
 
       // Generate the bulk load files
       util.startMiniMapReduceCluster();
@@ -284,6 +289,19 @@ public class TestHFileOutputFormat  {
       assertEquals("HFOF should not touch actual table",
           0, util.countRows(table));
   
+
+      // Make sure that a directory was created for every CF
+      int dir = 0;
+      for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) {
+        for (byte[] family : FAMILIES) {
+          if (Bytes.toString(family).equals(f.getPath().getName())) {
+            ++dir;
+          }
+        }
+      }
+      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
+
+      // handle the split case
       if (shouldChangeRegions) {
         LOG.info("Changing regions in table");
         admin.disableTable(table.getTableName());
@@ -293,8 +311,8 @@ public class TestHFileOutputFormat  {
           LOG.info("Waiting on table to finish disabling");
         }
         byte[][] newStartKeys = generateRandomStartKeys(15);
-        util.createMultiRegions(util.getConfiguration(),
-            table, FAMILY_NAME, newStartKeys);
+        util.createMultiRegions(
+            util.getConfiguration(), table, FAMILIES[0], newStartKeys);
         admin.enableTable(table.getTableName());
         while (table.getRegionsInfo().size() != 15 ||
             !admin.isTableAvailable(table.getTableName())) {
@@ -310,6 +328,19 @@ public class TestHFileOutputFormat  {
       int expectedRows = conf.getInt("mapred.map.tasks", 1) * ROWSPERSPLIT;
       assertEquals("LoadIncrementalHFiles should put expected data in table",
           expectedRows, util.countRows(table));
+      Scan scan = new Scan();
+      ResultScanner results = table.getScanner(scan);
+      int count = 0;
+      for (Result res : results) {
+        count++;
+        assertEquals(FAMILIES.length, res.raw().length);
+        KeyValue first = res.raw()[0];
+        for (KeyValue kv : res.raw()) {
+          assertTrue(KeyValue.COMPARATOR.matchingRows(first, kv));
+          assertTrue(Bytes.equals(first.getValue(), kv.getValue()));
+        }
+      }
+      results.close();
       String tableDigestBefore = util.checksumRows(table);
             
       // Cause regions to reopen
@@ -351,11 +382,11 @@ public class TestHFileOutputFormat  {
     util = new HBaseTestingUtility(conf);
     if ("newtable".equals(args[0])) {
       byte[] tname = args[1].getBytes();
-      HTable table = util.createTable(tname, FAMILY_NAME);
+      HTable table = util.createTable(tname, FAMILIES);
       HBaseAdmin admin = new HBaseAdmin(conf);
       admin.disableTable(tname);
-      util.createMultiRegions(conf, table, FAMILY_NAME,
-          generateRandomStartKeys(5));
+      byte[][] startKeys = generateRandomStartKeys(5);
+      util.createMultiRegions(conf, table, FAMILIES[0], startKeys);
       admin.enableTable(tname);
     } else if ("incremental".equals(args[0])) {
       byte[] tname = args[1].getBytes();



Mime
View raw message