Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5FA1C7FC3 for ; Tue, 11 Oct 2011 02:21:20 +0000 (UTC) Received: (qmail 84362 invoked by uid 500); 11 Oct 2011 02:21:20 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 84337 invoked by uid 500); 11 Oct 2011 02:21:20 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 84330 invoked by uid 99); 11 Oct 2011 02:21:20 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Oct 2011 02:21:20 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Oct 2011 02:21:16 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 98995238888F for ; Tue, 11 Oct 2011 02:20:54 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1181566 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/mapreduce/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hadoop/hbase/mapreduce/ Date: Tue, 11 Oct 2011 02:20:54 -0000 To: commits@hbase.apache.org From: nspiegelberg@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111011022054.98995238888F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: nspiegelberg Date: Tue Oct 11 02:20:53 2011 New Revision: 1181566 URL: http://svn.apache.org/viewvc?rev=1181566&view=rev Log: add BloomFilter and timestamprange for bulk import Summary: add BloomFilter and timestamprange for bulk import. This is the version updated for hfile v2. Test Plan: unit tests. Reviewed By: kannan Reviewers: nspiegelberg, kannan, kranganathan, mbautin Commenters: kranganathan, nspiegelberg, mbautin CC: hbase@lists, kranganathan, nspiegelberg, gqchen, , mbautin, kannan Differential Revision: 237110 Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1181566&r1=1181565&r2=1181566&view=diff ============================================================================== --- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original) +++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Tue Oct 11 02:20:53 2011 @@ -47,6 +47,8 @@ import org.apache.hadoop.hbase.io.hfile. import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; +import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; @@ -70,6 +72,9 @@ import org.apache.commons.logging.LogFac public class HFileOutputFormat extends FileOutputFormat { static Log LOG = LogFactory.getLog(HFileOutputFormat.class); static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression"; + //This stores a string in the format family1=bloomType1&family2=bloomType2&...&familyN=bloomTypeN + static final String BLOOMFILTER_TYPE_PER_CF_KEY = + "hbase.hfileoutputformat.families.bloomfilter.typePerCF"; public RecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { @@ -90,7 +95,7 @@ public class HFileOutputFormat extends F // create a map from column family to the compression algorithm final Map compressionMap = createFamilyCompressionMap(conf); - final int bytesPerChecksum = HFile.getBytesPerChecksum(conf, conf); + final Map bloomTypeMap = createFamilyBloomTypeMap(conf); return new RecordWriter() { // Map of families to writers and how much has been output on the writer. @@ -102,6 +107,7 @@ public class HFileOutputFormat extends F public void write(ImmutableBytesWritable row, KeyValue kv) throws IOException { + // null input == user explicitly wants to flush if (row == null && kv == null) { rollWriters(); @@ -167,21 +173,35 @@ public class HFileOutputFormat extends F Path familydir = new Path(outputdir, Bytes.toString(family)); String compression = compressionMap.get(family); compression = compression == null ? defaultCompression : compression; - wl.writer = HFile.getWriterFactory(conf).createWriter(fs, - StoreFile.getUniqueFile(fs, familydir), blocksize, - bytesPerChecksum, compression, KeyValue.KEY_COMPARATOR); + Compression.Algorithm compressionAlgo = + Compression.getCompressionAlgorithmByName(compression); + + BloomType bloomType = bloomTypeMap.get(family); + if (bloomType == null) { + bloomType = BloomType.NONE; + } + + /* new bloom filter does not require maxKeys. */ + int maxKeys = 0; + wl.writer = StoreFile.createWriter(fs, familydir, blocksize, + compressionAlgo, KeyValue.COMPARATOR, conf, bloomType, maxKeys); this.writers.put(family, wl); return wl; } - private void close(final HFile.Writer w) throws IOException { + private void close(final StoreFile.Writer w) throws IOException { if (w != null) { w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString())); - w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, - Bytes.toBytes(true)); + + /* Set maxSequenceId to be 0 for bulk imported files since + * these files do not correspond to any edit log items. + * + * Set majorCompaction flag to be false for bulk import file. + */ + w.appendMetadata(0, false); w.close(); } } @@ -200,7 +220,7 @@ public class HFileOutputFormat extends F */ static class WriterLength { long written = 0; - HFile.Writer writer = null; + StoreFile.Writer writer = null; } /** @@ -317,6 +337,10 @@ public class HFileOutputFormat extends F // Set compression algorithms based on column families configureCompression(table, conf); + // Set BloomFilter type based on column families and + // relevant parameters. + configureBloomFilter(table, conf); + LOG.info("Incremental table output configured."); } @@ -379,4 +403,66 @@ public class HFileOutputFormat extends F // Get rid of the last ampersand conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString()); } + + private static void configureBloomFilter(HTable table, Configuration conf) + throws IOException { + // get conf information needed by BloomFilter + Configuration tableConf = table.getConfiguration(); + + // Now go through the column family and save the BloomFilter setting for + // each column family + HTableDescriptor tableDescriptor = table.getTableDescriptor(); + if (tableDescriptor == null){ + return; + } + + if (tableConf != null) { + // copying Bloom filter related configuration to conf. + BloomFilterFactory.copyBloomFilterConf(tableConf, conf); + } + + StringBuilder bloomfilterTypePerCFConfigValue = new StringBuilder(); + Collection families = tableDescriptor.getFamilies(); + int i = 0; + for (HColumnDescriptor familyDescriptor : families) { + if (i++ > 0) { + bloomfilterTypePerCFConfigValue.append('&'); + } + + bloomfilterTypePerCFConfigValue.append( + URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); + bloomfilterTypePerCFConfigValue.append('='); + bloomfilterTypePerCFConfigValue.append( + URLEncoder.encode(familyDescriptor.getBloomFilterType().toString(), + "UTF-8")); + } + + conf.set(BLOOMFILTER_TYPE_PER_CF_KEY, bloomfilterTypePerCFConfigValue.toString()); + } + + static Map createFamilyBloomTypeMap(Configuration conf) { + Map bloomTypeMap = + new TreeMap(Bytes.BYTES_COMPARATOR); + String bloomFilterTypeConf = conf.get(BLOOMFILTER_TYPE_PER_CF_KEY, ""); + + if (bloomFilterTypeConf.isEmpty()) { + return bloomTypeMap; + } + + for (String familyConf : bloomFilterTypeConf.split("&")) { + String[] familySplit = familyConf.split("="); + if (familySplit.length != 2) { + throw new AssertionError("invalid bloomfilter type configuration"); + } + + try { + bloomTypeMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(), + BloomType.valueOf(URLDecoder.decode(familySplit[1], "UTF-8"))); + } catch (UnsupportedEncodingException e) { + // will not happen with UTF-8 encoding + throw new AssertionError(e); + } + } + return bloomTypeMap; + } } Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1181566&r1=1181565&r2=1181566&view=diff ============================================================================== --- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original) +++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Oct 11 02:20:53 2011 @@ -116,7 +116,7 @@ public class StoreFile { private static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY"); /** Key for Timerange information in metadata*/ - static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE"); + public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE"); // Make default block size for StoreFiles 8k while testing. TODO: FIX! // Need to make it 8k for testing. Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java?rev=1181566&r1=1181565&r2=1181566&view=diff ============================================================================== --- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java (original) +++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java Tue Oct 11 02:20:53 2011 @@ -121,11 +121,52 @@ public final class BloomFilterFactory { return conf.getBoolean(IO_STOREFILE_BLOOM_ENABLED, true); } + /** + * @return the Bloom filter error rate in the given configuration + */ public static float getErrorRate(Configuration conf) { return conf.getFloat(IO_STOREFILE_BLOOM_ERROR_RATE, (float) 0.01); } /** + * @return the value for Bloom filter max fold in the given configuration + */ + public static int getMaxFold(Configuration conf) { + return conf.getInt(IO_STOREFILE_BLOOM_MAX_FOLD, MAX_ALLOWED_FOLD_FACTOR); + } + + /** @return the compound Bloom filter block size from the configuration */ + public static int getBloomBlockSize(Configuration conf) { + return conf.getInt(IO_STOREFILE_BLOOM_BLOCK_SIZE, 128 * 1024); + } + + /** @return whether to cache compound Bloom filter chunks on write */ + public static boolean cacheChunksOnWrite(Configuration conf) { + return conf.getBoolean(IO_STOREFILE_BLOOM_CACHE_ON_WRITE, false); + } + + /** + * @return max key for the Bloom filter from the configuration + */ + public static int getMaxKeys(Configuration conf) { + return conf.getInt(IO_STOREFILE_BLOOM_MAX_KEYS, 128 * 1000 * 1000); + } + + /** + * Copy the BloomFilter related configuration from fromConf to toConf + * @param fromConf conf we will copy from + * @param toConf conf we will copy to + */ + public static void copyBloomFilterConf(Configuration fromConf, Configuration toConf) { + toConf.setBoolean(IO_STOREFILE_BLOOM_ENABLED, isBloomEnabled(fromConf)); + toConf.setFloat(IO_STOREFILE_BLOOM_ERROR_RATE, getErrorRate(fromConf)); + toConf.setInt(IO_STOREFILE_BLOOM_MAX_FOLD, getMaxFold(fromConf)); + toConf.setInt(IO_STOREFILE_BLOOM_BLOCK_SIZE, getBloomBlockSize(fromConf)); + toConf.setBoolean(IO_STOREFILE_BLOOM_CACHE_ON_WRITE, cacheChunksOnWrite(fromConf)); + toConf.setInt(IO_STOREFILE_BLOOM_MAX_KEYS, getMaxKeys(fromConf)); + } + + /** * Creates a new Bloom filter at the time of * {@link org.apache.hadoop.hbase.regionserver.StoreFile} writing. * @@ -161,8 +202,7 @@ public final class BloomFilterFactory { err = (float) (1 - Math.sqrt(1 - err)); } - int maxFold = conf.getInt(IO_STOREFILE_BLOOM_MAX_FOLD, - MAX_ALLOWED_FOLD_FACTOR); + int maxFold = getMaxFold(conf); if (HFile.getFormatVersion(conf) > HFile.MIN_FORMAT_VERSION) { // In case of compound Bloom filters we ignore the maxKeys hint. @@ -175,8 +215,7 @@ public final class BloomFilterFactory { } else { // A single-block Bloom filter. Only used when testing HFile format // version 1. - int tooBig = conf.getInt(IO_STOREFILE_BLOOM_MAX_KEYS, - 128 * 1000 * 1000); + int tooBig = getMaxKeys(conf); if (maxKeys <= 0) { LOG.warn("Invalid maximum number of keys specified: " + maxKeys @@ -195,14 +234,5 @@ public final class BloomFilterFactory { return null; } - /** @return the compound Bloom filter block size from the configuration */ - public static int getBloomBlockSize(Configuration conf) { - return conf.getInt(IO_STOREFILE_BLOOM_BLOCK_SIZE, 128 * 1024); - } - - /** @return whether to cache compound Bloom filter chunks on write */ - public static boolean cacheChunksOnWrite(Configuration conf) { - return conf.getBoolean(IO_STOREFILE_BLOOM_CACHE_ON_WRITE, false); - } }; Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java?rev=1181566&r1=1181565&r2=1181566&view=diff ============================================================================== --- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (original) +++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java Tue Oct 11 02:20:53 2011 @@ -56,6 +56,9 @@ import org.apache.hadoop.hbase.io.hfile. import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.Reader; +import org.apache.hadoop.hbase.io.hfile.HFile.Reader; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; @@ -82,7 +85,8 @@ public class TestHFileOutputFormat { private static final byte[][] FAMILIES = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")) - , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))}; + , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) + , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-C"))}; private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable"); private HBaseTestingUtility util = new HBaseTestingUtility(); @@ -552,6 +556,105 @@ public class TestHFileOutputFormat { return supportedAlgos.toArray(new Compression.Algorithm[0]); } + private void setupColumnFamiliesBloomType(HTable table, + Map familyToBloom) throws IOException + { + HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME); + for (Entry entry : familyToBloom.entrySet()) { + mockTableDescriptor.addFamily( + new HColumnDescriptor(entry.getKey().getBytes(), 1, + Compression.Algorithm.NONE.getName(), false, + false, 0, entry.getValue().toString())); + } + Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); + } + + /** + * Test that {@link HFileOutputFormat} RecordWriter uses bloomfilter settings + * from the column family descriptor + */ + @Test + public void testColumnFamilyBloomFilter() + throws IOException, InterruptedException { + Configuration conf = new Configuration(this.util.getConfiguration()); + RecordWriter writer = null; + TaskAttemptContext context = null; + Path dir = + HBaseTestingUtility.getTestDir("testColumnFamilyBloomFilter"); + + HTable table = Mockito.mock(HTable.class); + + Map configuredBloomFilter = + new HashMap(); + BloomType [] bloomTypeValues = BloomType.values(); + + int familyIndex = 0; + for (byte[] family : FAMILIES) { + configuredBloomFilter.put(Bytes.toString(family), + bloomTypeValues[familyIndex++ % bloomTypeValues.length]); + } + + setupColumnFamiliesBloomType(table, configuredBloomFilter); + + // set up the table to return some mock keys + setupMockStartKeys(table); + + try { + // partial map red setup to get an operational writer for testing + Job job = new Job(conf, "testLocalMRIncrementalLoad"); + setupRandomGeneratorMapper(job); + HFileOutputFormat.configureIncrementalLoad(job, table); + FileOutputFormat.setOutputPath(job, dir); + context = new TaskAttemptContext(job.getConfiguration(), + new TaskAttemptID()); + HFileOutputFormat hof = new HFileOutputFormat(); + writer = hof.getRecordWriter(context); + + // write out random rows + writeRandomKeyValues(writer, context, ROWSPERSPLIT); + writer.close(context); + + // Make sure that a directory was created for every CF + FileSystem fileSystem = dir.getFileSystem(conf); + + // commit so that the filesystem has one directory per column family + hof.getOutputCommitter(context).commitTask(context); + for (byte[] family : FAMILIES) { + String familyStr = new String(family); + boolean found = false; + for (FileStatus f : fileSystem.listStatus(dir)) { + + if (Bytes.toString(family).equals(f.getPath().getName())) { + // we found a matching directory + found = true; + + // verify that the bloomfilter type on this file matches the + // configured bloom type. + Path dataFilePath = fileSystem.listStatus(f.getPath())[0].getPath(); + StoreFile.Reader reader = new StoreFile.Reader(fileSystem, + dataFilePath, null, false, true); + Map metadataMap = reader.loadFileInfo(); + + assertTrue("timeRange is not set", + metadataMap.get(StoreFile.TIMERANGE_KEY) != null); + assertEquals("Incorrect bloom type used for column family " + + familyStr + "(reader: " + reader + ")", + configuredBloomFilter.get(familyStr), + reader.getBloomFilterType()); + break; + } + } + + if (!found) { + fail("HFile for column family " + familyStr + " not found"); + } + } + + } finally { + dir.getFileSystem(conf).delete(dir, true); + } + } + /** * Write random values to the writer assuming a table created using