Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8013010022 for ; Fri, 21 Feb 2014 21:29:52 +0000 (UTC) Received: (qmail 83051 invoked by uid 500); 21 Feb 2014 21:29:49 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 82989 invoked by uid 500); 21 Feb 2014 21:29:48 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 82834 invoked by uid 99); 21 Feb 2014 21:29:47 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Feb 2014 21:29:47 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Feb 2014 21:29:36 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 76E6323889E7; Fri, 21 Feb 2014 21:29:13 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1570714 - in /hbase/branches/0.96/hbase-server/src: main/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/mapreduce/ Date: Fri, 21 Feb 2014 21:29:13 -0000 To: commits@hbase.apache.org From: jxiang@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140221212913.76E6323889E7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jxiang Date: Fri Feb 21 21:29:12 2014 New Revision: 1570714 URL: http://svn.apache.org/r1570714 Log: HBASE-10526 Using Cell instead of KeyValue in HFileOutputFormat Added: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java (with props) hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java (with props) Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1570714&r1=1570713&r2=1570714&view=diff ============================================================================== --- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original) +++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Fri Feb 21 21:29:12 2014 @@ -19,51 +19,21 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; -import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; -import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; /** * Writes HFiles. Passed KeyValues must arrive in order. @@ -74,237 +44,17 @@ import org.apache.hadoop.mapreduce.lib.p * Using this class as part of a MapReduce job is best done * using {@link #configureIncrementalLoad(Job, HTable)}. * @see KeyValueSortReducer + * @deprecated use {@link HFileOutputFormat2} instead. */ +@Deprecated @InterfaceAudience.Public @InterfaceStability.Stable public class HFileOutputFormat extends FileOutputFormat { static Log LOG = LogFactory.getLog(HFileOutputFormat.class); - static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression"; - private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; - private static final String DATABLOCK_ENCODING_CONF_KEY = - "hbase.mapreduce.hfileoutputformat.datablock.encoding"; - private static final String BLOCK_SIZE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; public RecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { - // Get the path of the temporary output file - final Path outputPath = FileOutputFormat.getOutputPath(context); - final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath(); - final Configuration conf = context.getConfiguration(); - final FileSystem fs = outputdir.getFileSystem(conf); - // These configs. are from hbase-*.xml - final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, - HConstants.DEFAULT_MAX_FILE_SIZE); - // Invented config. Add to hbase-*.xml if other than default compression. - final String defaultCompression = conf.get("hfile.compression", - Compression.Algorithm.NONE.getName()); - final boolean compactionExclude = conf.getBoolean( - "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); - - // create a map from column family to the compression algorithm - final Map compressionMap = createFamilyCompressionMap(conf); - final Map bloomTypeMap = createFamilyBloomMap(conf); - final Map blockSizeMap = createFamilyBlockSizeMap(conf); - - String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY); - final HFileDataBlockEncoder encoder; - if (dataBlockEncodingStr == null) { - encoder = NoOpDataBlockEncoder.INSTANCE; - } else { - try { - encoder = new HFileDataBlockEncoderImpl(DataBlockEncoding - .valueOf(dataBlockEncodingStr)); - } catch (IllegalArgumentException ex) { - throw new RuntimeException( - "Invalid data block encoding type configured for the param " - + DATABLOCK_ENCODING_CONF_KEY + " : " + dataBlockEncodingStr); - } - } - - return new RecordWriter() { - // Map of families to writers and how much has been output on the writer. - private final Map writers = - new TreeMap(Bytes.BYTES_COMPARATOR); - private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY; - private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); - private boolean rollRequested = false; - - public void write(ImmutableBytesWritable row, KeyValue kv) - throws IOException { - // null input == user explicitly wants to flush - if (row == null && kv == null) { - rollWriters(); - return; - } - - byte [] rowKey = kv.getRow(); - long length = kv.getLength(); - byte [] family = kv.getFamily(); - WriterLength wl = this.writers.get(family); - - // If this is a new column family, verify that the directory exists - if (wl == null) { - fs.mkdirs(new Path(outputdir, Bytes.toString(family))); - } - - // If any of the HFiles for the column families has reached - // maxsize, we need to roll all the writers - if (wl != null && wl.written + length >= maxsize) { - this.rollRequested = true; - } - - // This can only happen once a row is finished though - if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { - rollWriters(); - } - - // create a new HLog writer, if necessary - if (wl == null || wl.writer == null) { - wl = getNewWriter(family, conf); - } - - // we now have the proper HLog writer. full steam ahead - kv.updateLatestStamp(this.now); - wl.writer.append(kv); - wl.written += length; - - // Copy the row so we know when a row transition. - this.previousRow = rowKey; - } - - private void rollWriters() throws IOException { - for (WriterLength wl : this.writers.values()) { - if (wl.writer != null) { - LOG.info("Writer=" + wl.writer.getPath() + - ((wl.written == 0)? "": ", wrote=" + wl.written)); - close(wl.writer); - } - wl.writer = null; - wl.written = 0; - } - this.rollRequested = false; - } - - /* Create a new StoreFile.Writer. - * @param family - * @return A WriterLength, containing a new StoreFile.Writer. - * @throws IOException - */ - private WriterLength getNewWriter(byte[] family, Configuration conf) - throws IOException { - WriterLength wl = new WriterLength(); - Path familydir = new Path(outputdir, Bytes.toString(family)); - String compression = compressionMap.get(family); - compression = compression == null ? defaultCompression : compression; - String bloomTypeStr = bloomTypeMap.get(family); - BloomType bloomType = BloomType.NONE; - if (bloomTypeStr != null) { - bloomType = BloomType.valueOf(bloomTypeStr); - } - String blockSizeString = blockSizeMap.get(family); - int blockSize = blockSizeString == null ? HConstants.DEFAULT_BLOCKSIZE - : Integer.parseInt(blockSizeString); - Configuration tempConf = new Configuration(conf); - tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); - wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blockSize) - .withOutputDir(familydir) - .withCompression(AbstractHFileWriter.compressionByName(compression)) - .withBloomType(bloomType) - .withComparator(KeyValue.COMPARATOR) - .withDataBlockEncoder(encoder) - .withChecksumType(HStore.getChecksumType(conf)) - .withBytesPerChecksum(HStore.getBytesPerChecksum(conf)) - .build(); - - this.writers.put(family, wl); - return wl; - } - - private void close(final StoreFile.Writer w) throws IOException { - if (w != null) { - w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, - Bytes.toBytes(System.currentTimeMillis())); - w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, - Bytes.toBytes(context.getTaskAttemptID().toString())); - w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, - Bytes.toBytes(true)); - w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, - Bytes.toBytes(compactionExclude)); - w.appendTrackedTimestampsToMetadata(); - w.close(); - } - } - - public void close(TaskAttemptContext c) - throws IOException, InterruptedException { - for (WriterLength wl: this.writers.values()) { - close(wl.writer); - } - } - }; - } - - /* - * Data structure to hold a Writer and amount of data written on it. - */ - static class WriterLength { - long written = 0; - StoreFile.Writer writer = null; - } - - /** - * Return the start keys of all of the regions in this table, - * as a list of ImmutableBytesWritable. - */ - private static List getRegionStartKeys(HTable table) - throws IOException { - byte[][] byteKeys = table.getStartKeys(); - ArrayList ret = - new ArrayList(byteKeys.length); - for (byte[] byteKey : byteKeys) { - ret.add(new ImmutableBytesWritable(byteKey)); - } - return ret; - } - - /** - * Write out a {@link SequenceFile} that can be read by - * {@link TotalOrderPartitioner} that contains the split points in startKeys. - */ - private static void writePartitions(Configuration conf, Path partitionsPath, - List startKeys) throws IOException { - LOG.info("Writing partition information to " + partitionsPath); - if (startKeys.isEmpty()) { - throw new IllegalArgumentException("No regions passed"); - } - - // We're generating a list of split points, and we don't ever - // have keys < the first region (which has an empty start key) - // so we need to remove it. Otherwise we would end up with an - // empty reducer with index 0 - TreeSet sorted = - new TreeSet(startKeys); - - ImmutableBytesWritable first = sorted.first(); - if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { - throw new IllegalArgumentException( - "First region of table should have empty start key. Instead has: " - + Bytes.toStringBinary(first.get())); - } - sorted.remove(first); - - // Write the actual file - FileSystem fs = partitionsPath.getFileSystem(conf); - SequenceFile.Writer writer = SequenceFile.createWriter(fs, - conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class); - - try { - for (ImmutableBytesWritable startKey : sorted) { - writer.append(startKey, NullWritable.get()); - } - } finally { - writer.close(); - } + return HFileOutputFormat2.createRecordWriter(context); } /** @@ -323,68 +73,7 @@ public class HFileOutputFormat extends F */ public static void configureIncrementalLoad(Job job, HTable table) throws IOException { - Configuration conf = job.getConfiguration(); - - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(KeyValue.class); - job.setOutputFormatClass(HFileOutputFormat.class); - - // Based on the configured map output class, set the correct reducer to properly - // sort the incoming values. - // TODO it would be nice to pick one or the other of these formats. - if (KeyValue.class.equals(job.getMapOutputValueClass())) { - job.setReducerClass(KeyValueSortReducer.class); - } else if (Put.class.equals(job.getMapOutputValueClass())) { - job.setReducerClass(PutSortReducer.class); - } else if (Text.class.equals(job.getMapOutputValueClass())) { - job.setReducerClass(TextSortReducer.class); - } else { - LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); - } - - conf.setStrings("io.serializations", conf.get("io.serializations"), - MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); - - // Use table's region boundaries for TOP split points. - LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName())); - List startKeys = getRegionStartKeys(table); - LOG.info("Configuring " + startKeys.size() + " reduce partitions " + - "to match current region count"); - job.setNumReduceTasks(startKeys.size()); - - configurePartitioner(job, startKeys); - // Set compression algorithms based on column families - configureCompression(table, conf); - configureBloomType(table, conf); - configureBlockSize(table, conf); - - TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.initCredentials(job); - LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured."); - } - - private static void configureBlockSize(HTable table, Configuration conf) throws IOException { - StringBuilder blockSizeConfigValue = new StringBuilder(); - HTableDescriptor tableDescriptor = table.getTableDescriptor(); - if(tableDescriptor == null){ - // could happen with mock table instance - return; - } - Collection families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - blockSizeConfigValue.append('&'); - } - blockSizeConfigValue.append(URLEncoder.encode( - familyDescriptor.getNameAsString(), "UTF-8")); - blockSizeConfigValue.append('='); - blockSizeConfigValue.append(URLEncoder.encode( - String.valueOf(familyDescriptor.getBlocksize()), "UTF-8")); - } - // Get rid of the last ampersand - conf.set(BLOCK_SIZE_CONF_KEY, blockSizeConfigValue.toString()); + HFileOutputFormat2.configureIncrementalLoad(job, table, HFileOutputFormat.class); } /** @@ -398,41 +87,7 @@ public class HFileOutputFormat extends F * algorithm */ static Map createFamilyCompressionMap(Configuration conf) { - return createFamilyConfValueMap(conf, COMPRESSION_CONF_KEY); - } - - private static Map createFamilyBloomMap(Configuration conf) { - return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY); - } - - private static Map createFamilyBlockSizeMap(Configuration conf) { - return createFamilyConfValueMap(conf, BLOCK_SIZE_CONF_KEY); - } - - /** - * Run inside the task to deserialize column family to given conf value map. - * - * @param conf - * @param confName - * @return a map of column family to the given configuration value - */ - private static Map createFamilyConfValueMap(Configuration conf, String confName) { - Map confValMap = new TreeMap(Bytes.BYTES_COMPARATOR); - String confVal = conf.get(confName, ""); - for (String familyConf : confVal.split("&")) { - String[] familySplit = familyConf.split("="); - if (familySplit.length != 2) { - continue; - } - try { - confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(), - URLDecoder.decode(familySplit[1], "UTF-8")); - } catch (UnsupportedEncodingException e) { - // will not happen with UTF-8 encoding - throw new AssertionError(e); - } - } - return confValMap; + return HFileOutputFormat2.createFamilyCompressionMap(conf); } /** @@ -441,17 +96,7 @@ public class HFileOutputFormat extends F */ static void configurePartitioner(Job job, List splitPoints) throws IOException { - - // create the partitions file - FileSystem fs = FileSystem.get(job.getConfiguration()); - Path partitionsPath = new Path("/tmp", "partitions_" + UUID.randomUUID()); - fs.makeQualified(partitionsPath); - fs.deleteOnExit(partitionsPath); - writePartitions(job.getConfiguration(), partitionsPath, splitPoints); - - // configure job to use it - job.setPartitionerClass(TotalOrderPartitioner.class); - TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath); + HFileOutputFormat2.configurePartitioner(job, splitPoints); } /** @@ -466,24 +111,7 @@ public class HFileOutputFormat extends F @edu.umd.cs.findbugs.annotations.SuppressWarnings( value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") static void configureCompression(HTable table, Configuration conf) throws IOException { - StringBuilder compressionConfigValue = new StringBuilder(); - HTableDescriptor tableDescriptor = table.getTableDescriptor(); - if(tableDescriptor == null){ - // could happen with mock table instance - return; - } - Collection families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - compressionConfigValue.append('&'); - } - compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); - compressionConfigValue.append('='); - compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8")); - } - // Get rid of the last ampersand - conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString()); + HFileOutputFormat2.configureCompression(table, conf); } /** @@ -494,26 +122,6 @@ public class HFileOutputFormat extends F * on failure to read column family descriptors */ static void configureBloomType(HTable table, Configuration conf) throws IOException { - HTableDescriptor tableDescriptor = table.getTableDescriptor(); - if (tableDescriptor == null) { - // could happen with mock table instance - return; - } - StringBuilder bloomTypeConfigValue = new StringBuilder(); - Collection families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - bloomTypeConfigValue.append('&'); - } - bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); - bloomTypeConfigValue.append('='); - String bloomType = familyDescriptor.getBloomFilterType().toString(); - if (bloomType == null) { - bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER; - } - bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8")); - } - conf.set(BLOOM_TYPE_CONF_KEY, bloomTypeConfigValue.toString()); + HFileOutputFormat2.configureBloomType(table, conf); } } Added: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java?rev=1570714&view=auto ============================================================================== --- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java (added) +++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java Fri Feb 21 21:29:12 2014 @@ -0,0 +1,535 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; +import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; +import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; + +/** + * Writes HFiles. Passed Cells must arrive in order. + * Writes current time as the sequence id for the file. Sets the major compacted + * attribute on created hfiles. Calling write(null,null) will forceably roll + * all HFiles being written. + *

+ * Using this class as part of a MapReduce job is best done + * using {@link #configureIncrementalLoad(Job, HTable)}. + * @see KeyValueSortReducer + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class HFileOutputFormat2 extends FileOutputFormat { + static Log LOG = LogFactory.getLog(HFileOutputFormat2.class); + static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression"; + private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; + private static final String DATABLOCK_ENCODING_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + private static final String BLOCK_SIZE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; + + public RecordWriter getRecordWriter(final TaskAttemptContext context) + throws IOException, InterruptedException { + return createRecordWriter(context); + } + + static RecordWriter + createRecordWriter(final TaskAttemptContext context) + throws IOException, InterruptedException { + // Get the path of the temporary output file + final Path outputPath = FileOutputFormat.getOutputPath(context); + final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath(); + final Configuration conf = context.getConfiguration(); + final FileSystem fs = outputdir.getFileSystem(conf); + // These configs. are from hbase-*.xml + final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, + HConstants.DEFAULT_MAX_FILE_SIZE); + // Invented config. Add to hbase-*.xml if other than default compression. + final String defaultCompression = conf.get("hfile.compression", + Compression.Algorithm.NONE.getName()); + final boolean compactionExclude = conf.getBoolean( + "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); + + // create a map from column family to the compression algorithm + final Map compressionMap = createFamilyCompressionMap(conf); + final Map bloomTypeMap = createFamilyBloomMap(conf); + final Map blockSizeMap = createFamilyBlockSizeMap(conf); + + String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY); + final HFileDataBlockEncoder encoder; + if (dataBlockEncodingStr == null) { + encoder = NoOpDataBlockEncoder.INSTANCE; + } else { + try { + encoder = new HFileDataBlockEncoderImpl(DataBlockEncoding + .valueOf(dataBlockEncodingStr)); + } catch (IllegalArgumentException ex) { + throw new RuntimeException( + "Invalid data block encoding type configured for the param " + + DATABLOCK_ENCODING_CONF_KEY + " : " + dataBlockEncodingStr); + } + } + + return new RecordWriter() { + // Map of families to writers and how much has been output on the writer. + private final Map writers = + new TreeMap(Bytes.BYTES_COMPARATOR); + private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY; + private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); + private boolean rollRequested = false; + + public void write(ImmutableBytesWritable row, V cell) + throws IOException { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + + // null input == user explicitly wants to flush + if (row == null && kv == null) { + rollWriters(); + return; + } + + byte [] rowKey = kv.getRow(); + long length = kv.getLength(); + byte [] family = kv.getFamily(); + WriterLength wl = this.writers.get(family); + + // If this is a new column family, verify that the directory exists + if (wl == null) { + fs.mkdirs(new Path(outputdir, Bytes.toString(family))); + } + + // If any of the HFiles for the column families has reached + // maxsize, we need to roll all the writers + if (wl != null && wl.written + length >= maxsize) { + this.rollRequested = true; + } + + // This can only happen once a row is finished though + if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { + rollWriters(); + } + + // create a new HLog writer, if necessary + if (wl == null || wl.writer == null) { + wl = getNewWriter(family, conf); + } + + // we now have the proper HLog writer. full steam ahead + kv.updateLatestStamp(this.now); + wl.writer.append(kv); + wl.written += length; + + // Copy the row so we know when a row transition. + this.previousRow = rowKey; + } + + private void rollWriters() throws IOException { + for (WriterLength wl : this.writers.values()) { + if (wl.writer != null) { + LOG.info("Writer=" + wl.writer.getPath() + + ((wl.written == 0)? "": ", wrote=" + wl.written)); + close(wl.writer); + } + wl.writer = null; + wl.written = 0; + } + this.rollRequested = false; + } + + /* Create a new StoreFile.Writer. + * @param family + * @return A WriterLength, containing a new StoreFile.Writer. + * @throws IOException + */ + private WriterLength getNewWriter(byte[] family, Configuration conf) + throws IOException { + WriterLength wl = new WriterLength(); + Path familydir = new Path(outputdir, Bytes.toString(family)); + String compression = compressionMap.get(family); + compression = compression == null ? defaultCompression : compression; + String bloomTypeStr = bloomTypeMap.get(family); + BloomType bloomType = BloomType.NONE; + if (bloomTypeStr != null) { + bloomType = BloomType.valueOf(bloomTypeStr); + } + String blockSizeString = blockSizeMap.get(family); + int blockSize = blockSizeString == null ? HConstants.DEFAULT_BLOCKSIZE + : Integer.parseInt(blockSizeString); + Configuration tempConf = new Configuration(conf); + tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); + wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blockSize) + .withOutputDir(familydir) + .withCompression(AbstractHFileWriter.compressionByName(compression)) + .withBloomType(bloomType) + .withComparator(KeyValue.COMPARATOR) + .withDataBlockEncoder(encoder) + .withChecksumType(HStore.getChecksumType(conf)) + .withBytesPerChecksum(HStore.getBytesPerChecksum(conf)) + .build(); + + this.writers.put(family, wl); + return wl; + } + + private void close(final StoreFile.Writer w) throws IOException { + if (w != null) { + w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, + Bytes.toBytes(System.currentTimeMillis())); + w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, + Bytes.toBytes(context.getTaskAttemptID().toString())); + w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, + Bytes.toBytes(true)); + w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, + Bytes.toBytes(compactionExclude)); + w.appendTrackedTimestampsToMetadata(); + w.close(); + } + } + + public void close(TaskAttemptContext c) + throws IOException, InterruptedException { + for (WriterLength wl: this.writers.values()) { + close(wl.writer); + } + } + }; + } + + /* + * Data structure to hold a Writer and amount of data written on it. + */ + static class WriterLength { + long written = 0; + StoreFile.Writer writer = null; + } + + /** + * Return the start keys of all of the regions in this table, + * as a list of ImmutableBytesWritable. + */ + private static List getRegionStartKeys(HTable table) + throws IOException { + byte[][] byteKeys = table.getStartKeys(); + ArrayList ret = + new ArrayList(byteKeys.length); + for (byte[] byteKey : byteKeys) { + ret.add(new ImmutableBytesWritable(byteKey)); + } + return ret; + } + + /** + * Write out a {@link SequenceFile} that can be read by + * {@link TotalOrderPartitioner} that contains the split points in startKeys. + */ + private static void writePartitions(Configuration conf, Path partitionsPath, + List startKeys) throws IOException { + LOG.info("Writing partition information to " + partitionsPath); + if (startKeys.isEmpty()) { + throw new IllegalArgumentException("No regions passed"); + } + + // We're generating a list of split points, and we don't ever + // have keys < the first region (which has an empty start key) + // so we need to remove it. Otherwise we would end up with an + // empty reducer with index 0 + TreeSet sorted = + new TreeSet(startKeys); + + ImmutableBytesWritable first = sorted.first(); + if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { + throw new IllegalArgumentException( + "First region of table should have empty start key. Instead has: " + + Bytes.toStringBinary(first.get())); + } + sorted.remove(first); + + // Write the actual file + FileSystem fs = partitionsPath.getFileSystem(conf); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, + conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class); + + try { + for (ImmutableBytesWritable startKey : sorted) { + writer.append(startKey, NullWritable.get()); + } + } finally { + writer.close(); + } + } + + /** + * Configure a MapReduce Job to perform an incremental load into the given + * table. This + *

    + *
  • Inspects the table to configure a total order partitioner
  • + *
  • Uploads the partitions file to the cluster and adds it to the DistributedCache
  • + *
  • Sets the number of reduce tasks to match the current number of regions
  • + *
  • Sets the output key/value class to match HFileOutputFormat2's requirements
  • + *
  • Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or + * PutSortReducer)
  • + *
+ * The user should be sure to set the map output value class to either KeyValue or Put before + * running this function. + */ + public static void configureIncrementalLoad(Job job, HTable table) + throws IOException { + configureIncrementalLoad(job, table, HFileOutputFormat2.class); + } + + static void configureIncrementalLoad(Job job, HTable table, + Class> cls) throws IOException { + Configuration conf = job.getConfiguration(); + + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(KeyValue.class); + job.setOutputFormatClass(HFileOutputFormat2.class); + + // Based on the configured map output class, set the correct reducer to properly + // sort the incoming values. + // TODO it would be nice to pick one or the other of these formats. + if (KeyValue.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(KeyValueSortReducer.class); + } else if (Put.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(PutSortReducer.class); + } else if (Text.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(TextSortReducer.class); + } else { + LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); + } + + conf.setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName(), + KeyValueSerialization.class.getName()); + + // Use table's region boundaries for TOP split points. + LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName())); + List startKeys = getRegionStartKeys(table); + LOG.info("Configuring " + startKeys.size() + " reduce partitions " + + "to match current region count"); + job.setNumReduceTasks(startKeys.size()); + + configurePartitioner(job, startKeys); + // Set compression algorithms based on column families + configureCompression(table, conf); + configureBloomType(table, conf); + configureBlockSize(table, conf); + + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.initCredentials(job); + LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured."); + } + + private static void configureBlockSize(HTable table, Configuration conf) throws IOException { + StringBuilder blockSizeConfigValue = new StringBuilder(); + HTableDescriptor tableDescriptor = table.getTableDescriptor(); + if(tableDescriptor == null){ + // could happen with mock table instance + return; + } + Collection families = tableDescriptor.getFamilies(); + int i = 0; + for (HColumnDescriptor familyDescriptor : families) { + if (i++ > 0) { + blockSizeConfigValue.append('&'); + } + blockSizeConfigValue.append(URLEncoder.encode( + familyDescriptor.getNameAsString(), "UTF-8")); + blockSizeConfigValue.append('='); + blockSizeConfigValue.append(URLEncoder.encode( + String.valueOf(familyDescriptor.getBlocksize()), "UTF-8")); + } + // Get rid of the last ampersand + conf.set(BLOCK_SIZE_CONF_KEY, blockSizeConfigValue.toString()); + } + + /** + * Run inside the task to deserialize column family to compression algorithm + * map from the + * configuration. + * + * Package-private for unit tests only. + * + * @return a map from column family to the name of the configured compression + * algorithm + */ + static Map createFamilyCompressionMap(Configuration conf) { + return createFamilyConfValueMap(conf, COMPRESSION_CONF_KEY); + } + + private static Map createFamilyBloomMap(Configuration conf) { + return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY); + } + + private static Map createFamilyBlockSizeMap(Configuration conf) { + return createFamilyConfValueMap(conf, BLOCK_SIZE_CONF_KEY); + } + + /** + * Run inside the task to deserialize column family to given conf value map. + * + * @param conf + * @param confName + * @return a map of column family to the given configuration value + */ + private static Map createFamilyConfValueMap(Configuration conf, String confName) { + Map confValMap = new TreeMap(Bytes.BYTES_COMPARATOR); + String confVal = conf.get(confName, ""); + for (String familyConf : confVal.split("&")) { + String[] familySplit = familyConf.split("="); + if (familySplit.length != 2) { + continue; + } + try { + confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(), + URLDecoder.decode(familySplit[1], "UTF-8")); + } catch (UnsupportedEncodingException e) { + // will not happen with UTF-8 encoding + throw new AssertionError(e); + } + } + return confValMap; + } + + /** + * Configure job with a TotalOrderPartitioner, partitioning against + * splitPoints. Cleans up the partitions file after job exists. + */ + static void configurePartitioner(Job job, List splitPoints) + throws IOException { + + // create the partitions file + FileSystem fs = FileSystem.get(job.getConfiguration()); + Path partitionsPath = new Path("/tmp", "partitions_" + UUID.randomUUID()); + fs.makeQualified(partitionsPath); + fs.deleteOnExit(partitionsPath); + writePartitions(job.getConfiguration(), partitionsPath, splitPoints); + + // configure job to use it + job.setPartitionerClass(TotalOrderPartitioner.class); + TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath); + } + + /** + * Serialize column family to compression algorithm map to configuration. + * Invoked while configuring the MR job for incremental load. + * + * Package-private for unit tests only. + * + * @throws IOException + * on failure to read column family descriptors + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings( + value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") + static void configureCompression(HTable table, Configuration conf) throws IOException { + StringBuilder compressionConfigValue = new StringBuilder(); + HTableDescriptor tableDescriptor = table.getTableDescriptor(); + if(tableDescriptor == null){ + // could happen with mock table instance + return; + } + Collection families = tableDescriptor.getFamilies(); + int i = 0; + for (HColumnDescriptor familyDescriptor : families) { + if (i++ > 0) { + compressionConfigValue.append('&'); + } + compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); + compressionConfigValue.append('='); + compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8")); + } + // Get rid of the last ampersand + conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString()); + } + + /** + * Serialize column family to bloom type map to configuration. + * Invoked while configuring the MR job for incremental load. + * + * @throws IOException + * on failure to read column family descriptors + */ + static void configureBloomType(HTable table, Configuration conf) throws IOException { + HTableDescriptor tableDescriptor = table.getTableDescriptor(); + if (tableDescriptor == null) { + // could happen with mock table instance + return; + } + StringBuilder bloomTypeConfigValue = new StringBuilder(); + Collection families = tableDescriptor.getFamilies(); + int i = 0; + for (HColumnDescriptor familyDescriptor : families) { + if (i++ > 0) { + bloomTypeConfigValue.append('&'); + } + bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); + bloomTypeConfigValue.append('='); + String bloomType = familyDescriptor.getBloomFilterType().toString(); + if (bloomType == null) { + bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER; + } + bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8")); + } + conf.set(BLOOM_TYPE_CONF_KEY, bloomTypeConfigValue.toString()); + } +} Propchange: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java ------------------------------------------------------------------------------ svn:eol-style = native Added: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java?rev=1570714&view=auto ============================================================================== --- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java (added) +++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java Fri Feb 21 21:29:12 2014 @@ -0,0 +1,872 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.concurrent.Callable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HadoopShims; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.PerformanceEvaluation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFile.Reader; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import com.google.common.collect.Lists; + +/** + * Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat2}. + * Sets up and runs a mapreduce job that writes hfile output. + * Creates a few inner classes to implement splits and an inputformat that + * emits keys and values like those of {@link PerformanceEvaluation}. + */ +@Category(LargeTests.class) +public class TestHFileOutputFormat2 { + private final static int ROWSPERSPLIT = 1024; + + private static final byte[][] FAMILIES + = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")) + , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))}; + private static final TableName TABLE_NAME = + TableName.valueOf("TestTable"); + + private HBaseTestingUtility util = new HBaseTestingUtility(); + + private static Log LOG = LogFactory.getLog(TestHFileOutputFormat2.class); + + /** + * Simple mapper that makes KeyValue output. + */ + static class RandomKVGeneratingMapper + extends Mapper { + + private int keyLength; + private static final int KEYLEN_DEFAULT=10; + private static final String KEYLEN_CONF="randomkv.key.length"; + + private int valLength; + private static final int VALLEN_DEFAULT=10; + private static final String VALLEN_CONF="randomkv.val.length"; + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); + + Configuration conf = context.getConfiguration(); + keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); + valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); + } + + protected void map( + NullWritable n1, NullWritable n2, + Mapper.Context context) + throws java.io.IOException ,InterruptedException + { + + byte keyBytes[] = new byte[keyLength]; + byte valBytes[] = new byte[valLength]; + + int taskId = context.getTaskAttemptID().getTaskID().getId(); + assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; + + Random random = new Random(); + for (int i = 0; i < ROWSPERSPLIT; i++) { + + random.nextBytes(keyBytes); + // Ensure that unique tasks generate unique keys + keyBytes[keyLength - 1] = (byte)(taskId & 0xFF); + random.nextBytes(valBytes); + ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); + + for (byte[] family : TestHFileOutputFormat2.FAMILIES) { + KeyValue kv = new KeyValue(keyBytes, family, + PerformanceEvaluation.QUALIFIER_NAME, valBytes); + context.write(key, kv); + } + } + } + } + + private void setupRandomGeneratorMapper(Job job) { + job.setInputFormatClass(NMapInputFormat.class); + job.setMapperClass(RandomKVGeneratingMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + } + + /** + * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if + * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}. + * @see HBASE-2615 + */ + @Test + public void test_LATEST_TIMESTAMP_isReplaced() + throws Exception { + Configuration conf = new Configuration(this.util.getConfiguration()); + RecordWriter writer = null; + TaskAttemptContext context = null; + Path dir = + util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced"); + try { + Job job = new Job(conf); + FileOutputFormat.setOutputPath(job, dir); + context = createTestTaskAttemptContext(job); + HFileOutputFormat2 hof = new HFileOutputFormat2(); + writer = hof.getRecordWriter(context); + final byte [] b = Bytes.toBytes("b"); + + // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be + // changed by call to write. Check all in kv is same but ts. + KeyValue kv = new KeyValue(b, b, b); + KeyValue original = kv.clone(); + writer.write(new ImmutableBytesWritable(), kv); + assertFalse(original.equals(kv)); + assertTrue(Bytes.equals(original.getRow(), kv.getRow())); + assertTrue(original.matchingColumn(kv.getFamily(), kv.getQualifier())); + assertNotSame(original.getTimestamp(), kv.getTimestamp()); + assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp()); + + // Test 2. Now test passing a kv that has explicit ts. It should not be + // changed by call to record write. + kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b); + original = kv.clone(); + writer.write(new ImmutableBytesWritable(), kv); + assertTrue(original.equals(kv)); + } finally { + if (writer != null && context != null) writer.close(context); + dir.getFileSystem(conf).delete(dir, true); + } + } + + private TaskAttemptContext createTestTaskAttemptContext(final Job job) + throws IOException, Exception { + HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); + TaskAttemptContext context = hadoop.createTestTaskAttemptContext(job, "attempt_200707121733_0001_m_000000_0"); + return context; + } + + /* + * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE + * metadata used by time-restricted scans. + */ + @Test + public void test_TIMERANGE() throws Exception { + Configuration conf = new Configuration(this.util.getConfiguration()); + RecordWriter writer = null; + TaskAttemptContext context = null; + Path dir = + util.getDataTestDir("test_TIMERANGE_present"); + LOG.info("Timerange dir writing to dir: "+ dir); + try { + // build a record writer using HFileOutputFormat2 + Job job = new Job(conf); + FileOutputFormat.setOutputPath(job, dir); + context = createTestTaskAttemptContext(job); + HFileOutputFormat2 hof = new HFileOutputFormat2(); + writer = hof.getRecordWriter(context); + + // Pass two key values with explicit times stamps + final byte [] b = Bytes.toBytes("b"); + + // value 1 with timestamp 2000 + KeyValue kv = new KeyValue(b, b, b, 2000, b); + KeyValue original = kv.clone(); + writer.write(new ImmutableBytesWritable(), kv); + assertEquals(original,kv); + + // value 2 with timestamp 1000 + kv = new KeyValue(b, b, b, 1000, b); + original = kv.clone(); + writer.write(new ImmutableBytesWritable(), kv); + assertEquals(original, kv); + + // verify that the file has the proper FileInfo. + writer.close(context); + + // the generated file lives 1 directory down from the attempt directory + // and is the only file, e.g. + // _attempt__0000_r_000000_0/b/1979617994050536795 + FileSystem fs = FileSystem.get(conf); + Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent(); + FileStatus[] sub1 = fs.listStatus(attemptDirectory); + FileStatus[] file = fs.listStatus(sub1[0].getPath()); + + // open as HFile Reader and pull out TIMERANGE FileInfo. + HFile.Reader rd = HFile.createReader(fs, file[0].getPath(), + new CacheConfig(conf)); + Map finfo = rd.loadFileInfo(); + byte[] range = finfo.get("TIMERANGE".getBytes()); + assertNotNull(range); + + // unmarshall and check values. + TimeRangeTracker timeRangeTracker = new TimeRangeTracker(); + Writables.copyWritable(range, timeRangeTracker); + LOG.info(timeRangeTracker.getMinimumTimestamp() + + "...." + timeRangeTracker.getMaximumTimestamp()); + assertEquals(1000, timeRangeTracker.getMinimumTimestamp()); + assertEquals(2000, timeRangeTracker.getMaximumTimestamp()); + rd.close(); + } finally { + if (writer != null && context != null) writer.close(context); + dir.getFileSystem(conf).delete(dir, true); + } + } + + /** + * Run small MR job. + */ + @Test + public void testWritingPEData() throws Exception { + Configuration conf = util.getConfiguration(); + Path testDir = util.getDataTestDirOnTestFS("testWritingPEData"); + FileSystem fs = testDir.getFileSystem(conf); + + // Set down this value or we OOME in eclipse. + conf.setInt("io.sort.mb", 20); + // Write a few files. + conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); + + Job job = new Job(conf, "testWritingPEData"); + setupRandomGeneratorMapper(job); + // This partitioner doesn't work well for number keys but using it anyways + // just to demonstrate how to configure it. + byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; + byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; + + Arrays.fill(startKey, (byte)0); + Arrays.fill(endKey, (byte)0xff); + + job.setPartitionerClass(SimpleTotalOrderPartitioner.class); + // Set start and end rows for partitioner. + SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); + SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); + job.setReducerClass(KeyValueSortReducer.class); + job.setOutputFormatClass(HFileOutputFormat2.class); + job.setNumReduceTasks(4); + job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName(), + KeyValueSerialization.class.getName()); + + FileOutputFormat.setOutputPath(job, testDir); + assertTrue(job.waitForCompletion(false)); + FileStatus [] files = fs.listStatus(testDir); + assertTrue(files.length > 0); + } + + @Test + public void testJobConfiguration() throws Exception { + Job job = new Job(util.getConfiguration()); + job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); + HTable table = Mockito.mock(HTable.class); + setupMockStartKeys(table); + HFileOutputFormat2.configureIncrementalLoad(job, table); + assertEquals(job.getNumReduceTasks(), 4); + } + + private byte [][] generateRandomStartKeys(int numKeys) { + Random random = new Random(); + byte[][] ret = new byte[numKeys][]; + // first region start key is always empty + ret[0] = HConstants.EMPTY_BYTE_ARRAY; + for (int i = 1; i < numKeys; i++) { + ret[i] = PerformanceEvaluation.generateValue(random); + } + return ret; + } + + @Test + public void testMRIncrementalLoad() throws Exception { + LOG.info("\nStarting test testMRIncrementalLoad\n"); + doIncrementalLoadTest(false); + } + + @Test + public void testMRIncrementalLoadWithSplit() throws Exception { + LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); + doIncrementalLoadTest(true); + } + + private void doIncrementalLoadTest( + boolean shouldChangeRegions) throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + byte[][] startKeys = generateRandomStartKeys(5); + HBaseAdmin admin = null; + try { + util.startMiniCluster(); + Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); + admin = new HBaseAdmin(conf); + HTable table = util.createTable(TABLE_NAME, FAMILIES); + assertEquals("Should start with empty table", + 0, util.countRows(table)); + int numRegions = util.createMultiRegions( + util.getConfiguration(), table, FAMILIES[0], startKeys); + assertEquals("Should make 5 regions", numRegions, 5); + + // Generate the bulk load files + util.startMiniMapReduceCluster(); + runIncrementalPELoad(conf, table, testDir); + // This doesn't write into the table, just makes files + assertEquals("HFOF should not touch actual table", + 0, util.countRows(table)); + + + // Make sure that a directory was created for every CF + int dir = 0; + for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) { + for (byte[] family : FAMILIES) { + if (Bytes.toString(family).equals(f.getPath().getName())) { + ++dir; + } + } + } + assertEquals("Column family not found in FS.", FAMILIES.length, dir); + + // handle the split case + if (shouldChangeRegions) { + LOG.info("Changing regions in table"); + admin.disableTable(table.getTableName()); + while(util.getMiniHBaseCluster().getMaster().getAssignmentManager(). + getRegionStates().isRegionsInTransition()) { + Threads.sleep(200); + LOG.info("Waiting on table to finish disabling"); + } + byte[][] newStartKeys = generateRandomStartKeys(15); + util.createMultiRegions( + util.getConfiguration(), table, FAMILIES[0], newStartKeys); + admin.enableTable(table.getTableName()); + while (table.getRegionLocations().size() != 15 || + !admin.isTableAvailable(table.getTableName())) { + Thread.sleep(200); + LOG.info("Waiting for new region assignment to happen"); + } + } + + // Perform the actual load + new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table); + + // Ensure data shows up + int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; + assertEquals("LoadIncrementalHFiles should put expected data in table", + expectedRows, util.countRows(table)); + Scan scan = new Scan(); + ResultScanner results = table.getScanner(scan); + for (Result res : results) { + assertEquals(FAMILIES.length, res.rawCells().length); + Cell first = res.rawCells()[0]; + for (Cell kv : res.rawCells()) { + assertTrue(CellUtil.matchingRow(first, kv)); + assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); + } + } + results.close(); + String tableDigestBefore = util.checksumRows(table); + + // Cause regions to reopen + admin.disableTable(TABLE_NAME); + while (!admin.isTableDisabled(TABLE_NAME)) { + Thread.sleep(200); + LOG.info("Waiting for table to disable"); + } + admin.enableTable(TABLE_NAME); + util.waitTableAvailable(TABLE_NAME.getName()); + assertEquals("Data should remain after reopening of regions", + tableDigestBefore, util.checksumRows(table)); + } finally { + if (admin != null) admin.close(); + util.shutdownMiniMapReduceCluster(); + util.shutdownMiniCluster(); + } + } + + private void runIncrementalPELoad( + Configuration conf, HTable table, Path outDir) + throws Exception { + Job job = new Job(conf, "testLocalMRIncrementalLoad"); + job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); + job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName(), + KeyValueSerialization.class.getName()); + setupRandomGeneratorMapper(job); + HFileOutputFormat2.configureIncrementalLoad(job, table); + FileOutputFormat.setOutputPath(job, outDir); + + assertFalse( util.getTestFileSystem().exists(outDir)) ; + + assertEquals(table.getRegionLocations().size(), job.getNumReduceTasks()); + + assertTrue(job.waitForCompletion(true)); + } + + /** + * Test for + * {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests + * that the compression map is correctly deserialized from configuration + * + * @throws IOException + */ + @Test + public void testCreateFamilyCompressionMap() throws IOException { + for (int numCfs = 0; numCfs <= 3; numCfs++) { + Configuration conf = new Configuration(this.util.getConfiguration()); + Map familyToCompression = getMockColumnFamilies(numCfs); + HTable table = Mockito.mock(HTable.class); + setupMockColumnFamilies(table, familyToCompression); + HFileOutputFormat2.configureCompression(table, conf); + + // read back family specific compression setting from the configuration + Map retrievedFamilyToCompressionMap = HFileOutputFormat2.createFamilyCompressionMap(conf); + + // test that we have a value for all column families that matches with the + // used mock values + for (Entry entry : familyToCompression.entrySet()) { + assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), entry.getValue() + .getName(), retrievedFamilyToCompressionMap.get(entry.getKey().getBytes())); + } + } + } + + private void setupMockColumnFamilies(HTable table, + Map familyToCompression) throws IOException + { + HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME); + for (Entry entry : familyToCompression.entrySet()) { + mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) + .setMaxVersions(1) + .setCompressionType(entry.getValue()) + .setBlockCacheEnabled(false) + .setTimeToLive(0)); + } + Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); + } + + private void setupMockStartKeys(HTable table) throws IOException { + byte[][] mockKeys = new byte[][] { + HConstants.EMPTY_BYTE_ARRAY, + Bytes.toBytes("aaa"), + Bytes.toBytes("ggg"), + Bytes.toBytes("zzz") + }; + Mockito.doReturn(mockKeys).when(table).getStartKeys(); + } + + /** + * @return a map from column family names to compression algorithms for + * testing column family compression. Column family names have special characters + */ + private Map getMockColumnFamilies(int numCfs) { + Map familyToCompression = new HashMap(); + // use column family names having special characters + if (numCfs-- > 0) { + familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); + } + if (numCfs-- > 0) { + familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY); + } + if (numCfs-- > 0) { + familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); + } + if (numCfs-- > 0) { + familyToCompression.put("Family3", Compression.Algorithm.NONE); + } + return familyToCompression; + } + + /** + * Test that {@link HFileOutputFormat2} RecordWriter uses compression settings + * from the column family descriptor + */ + @Test + public void testColumnFamilyCompression() throws Exception { + Configuration conf = new Configuration(this.util.getConfiguration()); + RecordWriter writer = null; + TaskAttemptContext context = null; + Path dir = + util.getDataTestDirOnTestFS("testColumnFamilyCompression"); + + HTable table = Mockito.mock(HTable.class); + + Map configuredCompression = + new HashMap(); + Compression.Algorithm[] supportedAlgos = getSupportedCompressionAlgorithms(); + + int familyIndex = 0; + for (byte[] family : FAMILIES) { + configuredCompression.put(Bytes.toString(family), + supportedAlgos[familyIndex++ % supportedAlgos.length]); + } + setupMockColumnFamilies(table, configuredCompression); + + // set up the table to return some mock keys + setupMockStartKeys(table); + + try { + // partial map red setup to get an operational writer for testing + // We turn off the sequence file compression, because DefaultCodec + // pollutes the GZip codec pool with an incompatible compressor. + conf.set("io.seqfile.compression.type", "NONE"); + Job job = new Job(conf, "testLocalMRIncrementalLoad"); + job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilyCompression")); + setupRandomGeneratorMapper(job); + HFileOutputFormat2.configureIncrementalLoad(job, table); + FileOutputFormat.setOutputPath(job, dir); + context = createTestTaskAttemptContext(job); + HFileOutputFormat2 hof = new HFileOutputFormat2(); + writer = hof.getRecordWriter(context); + + // write out random rows + writeRandomKeyValues(writer, context, ROWSPERSPLIT); + writer.close(context); + + // Make sure that a directory was created for every CF + FileSystem fileSystem = dir.getFileSystem(conf); + + // commit so that the filesystem has one directory per column family + hof.getOutputCommitter(context).commitTask(context); + hof.getOutputCommitter(context).commitJob(context); + for (byte[] family : FAMILIES) { + String familyStr = new String(family); + boolean found = false; + for (FileStatus f : fileSystem.listStatus(dir)) { + + if (Bytes.toString(family).equals(f.getPath().getName())) { + // we found a matching directory + found = true; + + // verify that the compression on this file matches the configured + // compression + Path dataFilePath = fileSystem.listStatus(f.getPath())[0].getPath(); + Reader reader = HFile.createReader(fileSystem, dataFilePath, + new CacheConfig(conf)); + reader.loadFileInfo(); + assertEquals("Incorrect compression used for column family " + familyStr + + "(reader: " + reader + ")", + configuredCompression.get(familyStr), reader.getCompressionAlgorithm()); + break; + } + } + + if (!found) { + fail("HFile for column family " + familyStr + " not found"); + } + } + + } finally { + dir.getFileSystem(conf).delete(dir, true); + } + } + + + /** + * @return + */ + private Compression.Algorithm[] getSupportedCompressionAlgorithms() { + String[] allAlgos = HFile.getSupportedCompressionAlgorithms(); + List supportedAlgos = Lists.newArrayList(); + + for (String algoName : allAlgos) { + try { + Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName); + algo.getCompressor(); + supportedAlgos.add(algo); + } catch (Throwable t) { + // this algo is not available + } + } + + return supportedAlgos.toArray(new Compression.Algorithm[0]); + } + + + /** + * Write random values to the writer assuming a table created using + * {@link #FAMILIES} as column family descriptors + */ + private void writeRandomKeyValues(RecordWriter writer, TaskAttemptContext context, + int numRows) + throws IOException, InterruptedException { + byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; + int valLength = 10; + byte valBytes[] = new byte[valLength]; + + int taskId = context.getTaskAttemptID().getTaskID().getId(); + assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; + + Random random = new Random(); + for (int i = 0; i < numRows; i++) { + + Bytes.putInt(keyBytes, 0, i); + random.nextBytes(valBytes); + ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); + + for (byte[] family : TestHFileOutputFormat2.FAMILIES) { + KeyValue kv = new KeyValue(keyBytes, family, + PerformanceEvaluation.QUALIFIER_NAME, valBytes); + writer.write(key, kv); + } + } + } + + /** + * This test is to test the scenario happened in HBASE-6901. + * All files are bulk loaded and excluded from minor compaction. + * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException + * will be thrown. + */ + @Ignore ("Flakey: See HBASE-9051") @Test + public void testExcludeAllFromMinorCompaction() throws Exception { + Configuration conf = util.getConfiguration(); + conf.setInt("hbase.hstore.compaction.min", 2); + generateRandomStartKeys(5); + + try { + util.startMiniCluster(); + final FileSystem fs = util.getDFSCluster().getFileSystem(); + HBaseAdmin admin = new HBaseAdmin(conf); + HTable table = util.createTable(TABLE_NAME, FAMILIES); + assertEquals("Should start with empty table", 0, util.countRows(table)); + + // deep inspection: get the StoreFile dir + final Path storePath = HStore.getStoreHomedir( + FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME), + admin.getTableRegions(TABLE_NAME).get(0), + FAMILIES[0]); + assertEquals(0, fs.listStatus(storePath).length); + + // Generate two bulk load files + conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", + true); + util.startMiniMapReduceCluster(); + + for (int i = 0; i < 2; i++) { + Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); + runIncrementalPELoad(conf, table, testDir); + // Perform the actual load + new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table); + } + + // Ensure data shows up + int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; + assertEquals("LoadIncrementalHFiles should put expected data in table", + expectedRows, util.countRows(table)); + + // should have a second StoreFile now + assertEquals(2, fs.listStatus(storePath).length); + + // minor compactions shouldn't get rid of the file + admin.compact(TABLE_NAME.getName()); + try { + quickPoll(new Callable() { + public Boolean call() throws Exception { + return fs.listStatus(storePath).length == 1; + } + }, 5000); + throw new IOException("SF# = " + fs.listStatus(storePath).length); + } catch (AssertionError ae) { + // this is expected behavior + } + + // a major compaction should work though + admin.majorCompact(TABLE_NAME.getName()); + quickPoll(new Callable() { + public Boolean call() throws Exception { + return fs.listStatus(storePath).length == 1; + } + }, 5000); + + } finally { + util.shutdownMiniMapReduceCluster(); + util.shutdownMiniCluster(); + } + } + + @Test + public void testExcludeMinorCompaction() throws Exception { + Configuration conf = util.getConfiguration(); + conf.setInt("hbase.hstore.compaction.min", 2); + generateRandomStartKeys(5); + + try { + util.startMiniCluster(); + Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); + final FileSystem fs = util.getDFSCluster().getFileSystem(); + HBaseAdmin admin = new HBaseAdmin(conf); + HTable table = util.createTable(TABLE_NAME, FAMILIES); + assertEquals("Should start with empty table", 0, util.countRows(table)); + + // deep inspection: get the StoreFile dir + final Path storePath = HStore.getStoreHomedir( + FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME), + admin.getTableRegions(TABLE_NAME).get(0), + FAMILIES[0]); + assertEquals(0, fs.listStatus(storePath).length); + + // put some data in it and flush to create a storefile + Put p = new Put(Bytes.toBytes("test")); + p.add(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); + table.put(p); + admin.flush(TABLE_NAME.getName()); + assertEquals(1, util.countRows(table)); + quickPoll(new Callable() { + public Boolean call() throws Exception { + return fs.listStatus(storePath).length == 1; + } + }, 5000); + + // Generate a bulk load file with more rows + conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", + true); + util.startMiniMapReduceCluster(); + runIncrementalPELoad(conf, table, testDir); + + // Perform the actual load + new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table); + + // Ensure data shows up + int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; + assertEquals("LoadIncrementalHFiles should put expected data in table", + expectedRows + 1, util.countRows(table)); + + // should have a second StoreFile now + assertEquals(2, fs.listStatus(storePath).length); + + // minor compactions shouldn't get rid of the file + admin.compact(TABLE_NAME.getName()); + try { + quickPoll(new Callable() { + public Boolean call() throws Exception { + return fs.listStatus(storePath).length == 1; + } + }, 5000); + throw new IOException("SF# = " + fs.listStatus(storePath).length); + } catch (AssertionError ae) { + // this is expected behavior + } + + // a major compaction should work though + admin.majorCompact(TABLE_NAME.getName()); + quickPoll(new Callable() { + public Boolean call() throws Exception { + return fs.listStatus(storePath).length == 1; + } + }, 5000); + + } finally { + util.shutdownMiniMapReduceCluster(); + util.shutdownMiniCluster(); + } + } + + private void quickPoll(Callable c, int waitMs) throws Exception { + int sleepMs = 10; + int retries = (int) Math.ceil(((double) waitMs) / sleepMs); + while (retries-- > 0) { + if (c.call().booleanValue()) { + return; + } + Thread.sleep(sleepMs); + } + fail(); + } + + public static void main(String args[]) throws Exception { + new TestHFileOutputFormat2().manualTest(args); + } + + public void manualTest(String args[]) throws Exception { + Configuration conf = HBaseConfiguration.create(); + util = new HBaseTestingUtility(conf); + if ("newtable".equals(args[0])) { + byte[] tname = args[1].getBytes(); + HTable table = util.createTable(tname, FAMILIES); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.disableTable(tname); + byte[][] startKeys = generateRandomStartKeys(5); + util.createMultiRegions(conf, table, FAMILIES[0], startKeys); + admin.enableTable(tname); + } else if ("incremental".equals(args[0])) { + byte[] tname = args[1].getBytes(); + HTable table = new HTable(conf, tname); + Path outDir = new Path("incremental-out"); + runIncrementalPELoad(conf, table, outDir); + } else { + throw new RuntimeException( + "usage: TestHFileOutputFormat2 newtable | incremental"); + } + } + +} + Propchange: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java ------------------------------------------------------------------------------ svn:eol-style = native