Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5EBBD19796 for ; Thu, 14 Apr 2016 18:47:55 +0000 (UTC) Received: (qmail 25807 invoked by uid 500); 14 Apr 2016 18:47:53 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 25637 invoked by uid 500); 14 Apr 2016 18:47:53 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 21160 invoked by uid 99); 14 Apr 2016 18:47:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Apr 2016 18:47:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 27E9EDFE61; Thu, 14 Apr 2016 18:47:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Thu, 14 Apr 2016 18:48:32 -0000 Message-Id: <21dbf2ec55e14192b4ec1d83fcc81821@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [44/51] [partial] hbase-site git commit: Published site at 7efb9edecbdf8b35046230575d504e4caeb80f34. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/3c6f3528/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.html ---------------------------------------------------------------------- diff --git a/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.html b/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.html index 9f165f1..9671728 100644 --- a/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.html +++ b/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.html @@ -73,730 +73,731 @@ 065import org.apache.hadoop.hbase.regionserver.BloomType; 066import org.apache.hadoop.hbase.regionserver.HStore; 067import org.apache.hadoop.hbase.regionserver.StoreFile; -068import org.apache.hadoop.hbase.util.Bytes; -069import org.apache.hadoop.io.NullWritable; -070import org.apache.hadoop.io.SequenceFile; -071import org.apache.hadoop.io.Text; -072import org.apache.hadoop.mapreduce.Job; -073import org.apache.hadoop.mapreduce.OutputFormat; -074import org.apache.hadoop.mapreduce.RecordWriter; -075import org.apache.hadoop.mapreduce.TaskAttemptContext; -076import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -077import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -078import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; -079 -080import com.google.common.annotations.VisibleForTesting; -081 -082/** -083 * Writes HFiles. Passed Cells must arrive in order. -084 * Writes current time as the sequence id for the file. Sets the major compacted -085 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll -086 * all HFiles being written. -087 * <p> -088 * Using this class as part of a MapReduce job is best done -089 * using {@link #configureIncrementalLoad(Job, HTableDescriptor, RegionLocator, Class)}. -090 */ -091@InterfaceAudience.Public -092@InterfaceStability.Evolving -093public class HFileOutputFormat2 -094 extends FileOutputFormat<ImmutableBytesWritable, Cell> { -095 private static final Log LOG = LogFactory.getLog(HFileOutputFormat2.class); -096 -097 // The following constants are private since these are used by -098 // HFileOutputFormat2 to internally transfer data between job setup and -099 // reducer run using conf. -100 // These should not be changed by the client. -101 private static final String COMPRESSION_FAMILIES_CONF_KEY = -102 "hbase.hfileoutputformat.families.compression"; -103 private static final String BLOOM_TYPE_FAMILIES_CONF_KEY = -104 "hbase.hfileoutputformat.families.bloomtype"; -105 private static final String BLOCK_SIZE_FAMILIES_CONF_KEY = -106 "hbase.mapreduce.hfileoutputformat.blocksize"; -107 private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = -108 "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; -109 -110 // This constant is public since the client can modify this when setting -111 // up their conf object and thus refer to this symbol. -112 // It is present for backwards compatibility reasons. Use it only to -113 // override the auto-detection of datablock encoding. -114 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = -115 "hbase.mapreduce.hfileoutputformat.datablock.encoding"; -116 -117 /** -118 * Keep locality while generating HFiles for bulkload. See HBASE-12596 -119 */ -120 public static final String LOCALITY_SENSITIVE_CONF_KEY = -121 "hbase.bulkload.locality.sensitive.enabled"; -122 private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; -123 private static final String OUTPUT_TABLE_NAME_CONF_KEY = -124 "hbase.mapreduce.hfileoutputformat.table.name"; -125 -126 @Override -127 public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter( -128 final TaskAttemptContext context) throws IOException, InterruptedException { -129 return createRecordWriter(context); -130 } -131 -132 static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> -133 createRecordWriter(final TaskAttemptContext context) -134 throws IOException { -135 -136 // Get the path of the temporary output file -137 final Path outputPath = FileOutputFormat.getOutputPath(context); -138 final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath(); -139 final Configuration conf = context.getConfiguration(); -140 final FileSystem fs = outputdir.getFileSystem(conf); -141 // These configs. are from hbase-*.xml -142 final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, -143 HConstants.DEFAULT_MAX_FILE_SIZE); -144 // Invented config. Add to hbase-*.xml if other than default compression. -145 final String defaultCompressionStr = conf.get("hfile.compression", -146 Compression.Algorithm.NONE.getName()); -147 final Algorithm defaultCompression = HFileWriterImpl -148 .compressionByName(defaultCompressionStr); -149 final boolean compactionExclude = conf.getBoolean( -150 "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); -151 -152 // create a map from column family to the compression algorithm -153 final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); -154 final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf); -155 final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); -156 -157 String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); -158 final Map<byte[], DataBlockEncoding> datablockEncodingMap -159 = createFamilyDataBlockEncodingMap(conf); -160 final DataBlockEncoding overriddenEncoding; -161 if (dataBlockEncodingStr != null) { -162 overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr); -163 } else { -164 overriddenEncoding = null; -165 } -166 -167 return new RecordWriter<ImmutableBytesWritable, V>() { -168 // Map of families to writers and how much has been output on the writer. -169 private final Map<byte [], WriterLength> writers = -170 new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR); -171 private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY; -172 private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); -173 private boolean rollRequested = false; -174 -175 @Override -176 public void write(ImmutableBytesWritable row, V cell) -177 throws IOException { -178 KeyValue kv = KeyValueUtil.ensureKeyValue(cell); -179 -180 // null input == user explicitly wants to flush -181 if (row == null && kv == null) { -182 rollWriters(); -183 return; -184 } -185 -186 byte [] rowKey = CellUtil.cloneRow(kv); -187 long length = kv.getLength(); -188 byte [] family = CellUtil.cloneFamily(kv); -189 WriterLength wl = this.writers.get(family); -190 -191 // If this is a new column family, verify that the directory exists -192 if (wl == null) { -193 fs.mkdirs(new Path(outputdir, Bytes.toString(family))); -194 } -195 -196 // If any of the HFiles for the column families has reached -197 // maxsize, we need to roll all the writers -198 if (wl != null && wl.written + length >= maxsize) { -199 this.rollRequested = true; -200 } -201 -202 // This can only happen once a row is finished though -203 if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { -204 rollWriters(); -205 } -206 -207 // create a new WAL writer, if necessary -208 if (wl == null || wl.writer == null) { -209 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { -210 HRegionLocation loc = null; -211 String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); -212 if (tableName != null) { -213 try (Connection connection = ConnectionFactory.createConnection(conf); -214 RegionLocator locator = -215 connection.getRegionLocator(TableName.valueOf(tableName))) { -216 loc = locator.getRegionLocation(rowKey); -217 } catch (Throwable e) { -218 LOG.warn("there's something wrong when locating rowkey: " + -219 Bytes.toString(rowKey), e); -220 loc = null; -221 } -222 } -223 -224 if (null == loc) { -225 if (LOG.isTraceEnabled()) { -226 LOG.trace("failed to get region location, so use default writer: " + -227 Bytes.toString(rowKey)); -228 } -229 wl = getNewWriter(family, conf, null); -230 } else { -231 if (LOG.isDebugEnabled()) { -232 LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]"); -233 } -234 InetSocketAddress initialIsa = -235 new InetSocketAddress(loc.getHostname(), loc.getPort()); -236 if (initialIsa.isUnresolved()) { -237 if (LOG.isTraceEnabled()) { -238 LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":" -239 + loc.getPort() + ", so use default writer"); -240 } -241 wl = getNewWriter(family, conf, null); -242 } else { -243 if(LOG.isDebugEnabled()) { -244 LOG.debug("use favored nodes writer: " + initialIsa.getHostString()); -245 } -246 wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa }); -247 } -248 } -249 } else { -250 wl = getNewWriter(family, conf, null); -251 } -252 } -253 -254 // we now have the proper WAL writer. full steam ahead -255 kv.updateLatestStamp(this.now); -256 wl.writer.append(kv); -257 wl.written += length; -258 -259 // Copy the row so we know when a row transition. -260 this.previousRow = rowKey; -261 } -262 -263 private void rollWriters() throws IOException { -264 for (WriterLength wl : this.writers.values()) { -265 if (wl.writer != null) { -266 LOG.info("Writer=" + wl.writer.getPath() + -267 ((wl.written == 0)? "": ", wrote=" + wl.written)); -268 close(wl.writer); -269 } -270 wl.writer = null; -271 wl.written = 0; -272 } -273 this.rollRequested = false; -274 } -275 -276 /* Create a new StoreFile.Writer. -277 * @param family -278 * @return A WriterLength, containing a new StoreFile.Writer. -279 * @throws IOException -280 */ -281 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED", -282 justification="Not important") -283 private WriterLength getNewWriter(byte[] family, Configuration conf, -284 InetSocketAddress[] favoredNodes) throws IOException { -285 WriterLength wl = new WriterLength(); -286 Path familydir = new Path(outputdir, Bytes.toString(family)); -287 Algorithm compression = compressionMap.get(family); -288 compression = compression == null ? defaultCompression : compression; -289 BloomType bloomType = bloomTypeMap.get(family); -290 bloomType = bloomType == null ? BloomType.NONE : bloomType; -291 Integer blockSize = blockSizeMap.get(family); -292 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; -293 DataBlockEncoding encoding = overriddenEncoding; -294 encoding = encoding == null ? datablockEncodingMap.get(family) : encoding; -295 encoding = encoding == null ? DataBlockEncoding.NONE : encoding; -296 Configuration tempConf = new Configuration(conf); -297 tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); -298 HFileContextBuilder contextBuilder = new HFileContextBuilder() -299 .withCompression(compression) -300 .withChecksumType(HStore.getChecksumType(conf)) -301 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) -302 .withBlockSize(blockSize); -303 contextBuilder.withDataBlockEncoding(encoding); -304 HFileContext hFileContext = contextBuilder.build(); -305 -306 if (null == favoredNodes) { -307 wl.writer = -308 new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) -309 .withOutputDir(familydir).withBloomType(bloomType) -310 .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build(); -311 } else { -312 wl.writer = -313 new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) -314 .withOutputDir(familydir).withBloomType(bloomType) -315 .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) -316 .withFavoredNodes(favoredNodes).build(); -317 } -318 -319 this.writers.put(family, wl); -320 return wl; -321 } -322 -323 private void close(final StoreFile.Writer w) throws IOException { -324 if (w != null) { -325 w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, -326 Bytes.toBytes(System.currentTimeMillis())); -327 w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, -328 Bytes.toBytes(context.getTaskAttemptID().toString())); -329 w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, -330 Bytes.toBytes(true)); -331 w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, -332 Bytes.toBytes(compactionExclude)); -333 w.appendTrackedTimestampsToMetadata(); -334 w.close(); -335 } -336 } -337 -338 @Override -339 public void close(TaskAttemptContext c) -340 throws IOException, InterruptedException { -341 for (WriterLength wl: this.writers.values()) { -342 close(wl.writer); -343 } -344 } -345 }; -346 } -347 -348 /* -349 * Data structure to hold a Writer and amount of data written on it. -350 */ -351 static class WriterLength { -352 long written = 0; -353 StoreFile.Writer writer = null; -354 } -355 -356 /** -357 * Return the start keys of all of the regions in this table, -358 * as a list of ImmutableBytesWritable. -359 */ -360 private static List<ImmutableBytesWritable> getRegionStartKeys(RegionLocator table) -361 throws IOException { -362 byte[][] byteKeys = table.getStartKeys(); -363 ArrayList<ImmutableBytesWritable> ret = -364 new ArrayList<ImmutableBytesWritable>(byteKeys.length); -365 for (byte[] byteKey : byteKeys) { -366 ret.add(new ImmutableBytesWritable(byteKey)); -367 } -368 return ret; -369 } -370 -371 /** -372 * Write out a {@link SequenceFile} that can be read by -373 * {@link TotalOrderPartitioner} that contains the split points in startKeys. -374 */ -375 @SuppressWarnings("deprecation") -376 private static void writePartitions(Configuration conf, Path partitionsPath, -377 List<ImmutableBytesWritable> startKeys) throws IOException { -378 LOG.info("Writing partition information to " + partitionsPath); -379 if (startKeys.isEmpty()) { -380 throw new IllegalArgumentException("No regions passed"); -381 } -382 -383 // We're generating a list of split points, and we don't ever -384 // have keys < the first region (which has an empty start key) -385 // so we need to remove it. Otherwise we would end up with an -386 // empty reducer with index 0 -387 TreeSet<ImmutableBytesWritable> sorted = -388 new TreeSet<ImmutableBytesWritable>(startKeys); -389 -390 ImmutableBytesWritable first = sorted.first(); -391 if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { -392 throw new IllegalArgumentException( -393 "First region of table should have empty start key. Instead has: " -394 + Bytes.toStringBinary(first.get())); -395 } -396 sorted.remove(first); -397 -398 // Write the actual file -399 FileSystem fs = partitionsPath.getFileSystem(conf); -400 SequenceFile.Writer writer = SequenceFile.createWriter( -401 fs, conf, partitionsPath, ImmutableBytesWritable.class, -402 NullWritable.class); -403 -404 try { -405 for (ImmutableBytesWritable startKey : sorted) { -406 writer.append(startKey, NullWritable.get()); -407 } -408 } finally { -409 writer.close(); -410 } -411 } -412 -413 /** -414 * Configure a MapReduce Job to perform an incremental load into the given -415 * table. This -416 * <ul> -417 * <li>Inspects the table to configure a total order partitioner</li> -418 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> -419 * <li>Sets the number of reduce tasks to match the current number of regions</li> -420 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> -421 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or -422 * PutSortReducer)</li> -423 * </ul> -424 * The user should be sure to set the map output value class to either KeyValue or Put before -425 * running this function. -426 */ -427 public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) -428 throws IOException { -429 configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); -430 } -431 -432 /** -433 * Configure a MapReduce Job to perform an incremental load into the given -434 * table. This -435 * <ul> -436 * <li>Inspects the table to configure a total order partitioner</li> -437 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> -438 * <li>Sets the number of reduce tasks to match the current number of regions</li> -439 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> -440 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or -441 * PutSortReducer)</li> -442 * </ul> -443 * The user should be sure to set the map output value class to either KeyValue or Put before -444 * running this function. -445 */ -446 public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, -447 RegionLocator regionLocator) throws IOException { -448 configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat2.class); -449 } -450 -451 static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, -452 RegionLocator regionLocator, Class<? extends OutputFormat<?, ?>> cls) throws IOException, -453 UnsupportedEncodingException { -454 Configuration conf = job.getConfiguration(); -455 job.setOutputKeyClass(ImmutableBytesWritable.class); -456 job.setOutputValueClass(KeyValue.class); -457 job.setOutputFormatClass(cls); -458 -459 // Based on the configured map output class, set the correct reducer to properly -460 // sort the incoming values. -461 // TODO it would be nice to pick one or the other of these formats. -462 if (KeyValue.class.equals(job.getMapOutputValueClass())) { -463 job.setReducerClass(KeyValueSortReducer.class); -464 } else if (Put.class.equals(job.getMapOutputValueClass())) { -465 job.setReducerClass(PutSortReducer.class); -466 } else if (Text.class.equals(job.getMapOutputValueClass())) { -467 job.setReducerClass(TextSortReducer.class); -468 } else { -469 LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); -470 } -471 -472 conf.setStrings("io.serializations", conf.get("io.serializations"), -473 MutationSerialization.class.getName(), ResultSerialization.class.getName(), -474 KeyValueSerialization.class.getName()); -475 -476 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { -477 // record this table name for creating writer by favored nodes -478 LOG.info("bulkload locality sensitive enabled"); -479 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString()); -480 } -481 -482 // Use table's region boundaries for TOP split points. -483 LOG.info("Looking up current regions for table " + regionLocator.getName()); -484 List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator); -485 LOG.info("Configuring " + startKeys.size() + " reduce partitions " + -486 "to match current region count"); -487 job.setNumReduceTasks(startKeys.size()); -488 -489 configurePartitioner(job, startKeys); -490 // Set compression algorithms based on column families -491 configureCompression(conf, tableDescriptor); -492 configureBloomType(tableDescriptor, conf); -493 configureBlockSize(tableDescriptor, conf); -494 configureDataBlockEncoding(tableDescriptor, conf); -495 -496 TableMapReduceUtil.addDependencyJars(job); -497 TableMapReduceUtil.initCredentials(job); -498 LOG.info("Incremental table " + regionLocator.getName() + " output configured."); -499 } -500 -501 public static void configureIncrementalLoadMap(Job job, HTableDescriptor tableDescriptor) throws -502 IOException { -503 Configuration conf = job.getConfiguration(); -504 -505 job.setOutputKeyClass(ImmutableBytesWritable.class); -506 job.setOutputValueClass(KeyValue.class); -507 job.setOutputFormatClass(HFileOutputFormat2.class); -508 -509 // Set compression algorithms based on column families -510 configureCompression(conf, tableDescriptor); -511 configureBloomType(tableDescriptor, conf); -512 configureBlockSize(tableDescriptor, conf); -513 configureDataBlockEncoding(tableDescriptor, conf); -514 -515 TableMapReduceUtil.addDependencyJars(job); -516 TableMapReduceUtil.initCredentials(job); -517 LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured."); -518 } -519 -520 /** -521 * Runs inside the task to deserialize column family to compression algorithm -522 * map from the configuration. -523 * -524 * @param conf to read the serialized values from -525 * @return a map from column family to the configured compression algorithm -526 */ -527 @VisibleForTesting -528 static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration -529 conf) { -530 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, -531 COMPRESSION_FAMILIES_CONF_KEY); -532 Map<byte[], Algorithm> compressionMap = new TreeMap<byte[], -533 Algorithm>(Bytes.BYTES_COMPARATOR); -534 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { -535 Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue()); -536 compressionMap.put(e.getKey(), algorithm); -537 } -538 return compressionMap; -539 } -540 -541 /** -542 * Runs inside the task to deserialize column family to bloom filter type -543 * map from the configuration. -544 * -545 * @param conf to read the serialized values from -546 * @return a map from column family to the the configured bloom filter type -547 */ -548 @VisibleForTesting -549 static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) { -550 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, -551 BLOOM_TYPE_FAMILIES_CONF_KEY); -552 Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[], -553 BloomType>(Bytes.BYTES_COMPARATOR); -554 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { -555 BloomType bloomType = BloomType.valueOf(e.getValue()); -556 bloomTypeMap.put(e.getKey(), bloomType); -557 } -558 return bloomTypeMap; -559 } -560 -561 /** -562 * Runs inside the task to deserialize column family to block size -563 * map from the configuration. -564 * -565 * @param conf to read the serialized values from -566 * @return a map from column family to the configured block size -567 */ -568 @VisibleForTesting -569 static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) { -570 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, -571 BLOCK_SIZE_FAMILIES_CONF_KEY); -572 Map<byte[], Integer> blockSizeMap = new TreeMap<byte[], -573 Integer>(Bytes.BYTES_COMPARATOR); -574 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { -575 Integer blockSize = Integer.parseInt(e.getValue()); -576 blockSizeMap.put(e.getKey(), blockSize); -577 } -578 return blockSizeMap; -579 } -580 -581 /** -582 * Runs inside the task to deserialize column family to data block encoding -583 * type map from the configuration. -584 * -585 * @param conf to read the serialized values from -586 * @return a map from column family to HFileDataBlockEncoder for the -587 * configured data block type for the family -588 */ -589 @VisibleForTesting -590 static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap( -591 Configuration conf) { -592 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, -593 DATABLOCK_ENCODING_FAMILIES_CONF_KEY); -594 Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[], -595 DataBlockEncoding>(Bytes.BYTES_COMPARATOR); -596 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { -597 encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue()))); -598 } -599 return encoderMap; -600 } -601 +068import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +069import org.apache.hadoop.hbase.util.Bytes; +070import org.apache.hadoop.io.NullWritable; +071import org.apache.hadoop.io.SequenceFile; +072import org.apache.hadoop.io.Text; +073import org.apache.hadoop.mapreduce.Job; +074import org.apache.hadoop.mapreduce.OutputFormat; +075import org.apache.hadoop.mapreduce.RecordWriter; +076import org.apache.hadoop.mapreduce.TaskAttemptContext; +077import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +078import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +079import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; +080 +081import com.google.common.annotations.VisibleForTesting; +082 +083/** +084 * Writes HFiles. Passed Cells must arrive in order. +085 * Writes current time as the sequence id for the file. Sets the major compacted +086 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll +087 * all HFiles being written. +088 * <p> +089 * Using this class as part of a MapReduce job is best done +090 * using {@link #configureIncrementalLoad(Job, HTableDescriptor, RegionLocator, Class)}. +091 */ +092@InterfaceAudience.Public +093@InterfaceStability.Evolving +094public class HFileOutputFormat2 +095 extends FileOutputFormat<ImmutableBytesWritable, Cell> { +096 private static final Log LOG = LogFactory.getLog(HFileOutputFormat2.class); +097 +098 // The following constants are private since these are used by +099 // HFileOutputFormat2 to internally transfer data between job setup and +100 // reducer run using conf. +101 // These should not be changed by the client. +102 private static final String COMPRESSION_FAMILIES_CONF_KEY = +103 "hbase.hfileoutputformat.families.compression"; +104 private static final String BLOOM_TYPE_FAMILIES_CONF_KEY = +105 "hbase.hfileoutputformat.families.bloomtype"; +106 private static final String BLOCK_SIZE_FAMILIES_CONF_KEY = +107 "hbase.mapreduce.hfileoutputformat.blocksize"; +108 private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = +109 "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; +110 +111 // This constant is public since the client can modify this when setting +112 // up their conf object and thus refer to this symbol. +113 // It is present for backwards compatibility reasons. Use it only to +114 // override the auto-detection of datablock encoding. +115 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = +116 "hbase.mapreduce.hfileoutputformat.datablock.encoding"; +117 +118 /** +119 * Keep locality while generating HFiles for bulkload. See HBASE-12596 +120 */ +121 public static final String LOCALITY_SENSITIVE_CONF_KEY = +122 "hbase.bulkload.locality.sensitive.enabled"; +123 private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; +124 private static final String OUTPUT_TABLE_NAME_CONF_KEY = +125 "hbase.mapreduce.hfileoutputformat.table.name"; +126 +127 @Override +128 public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter( +129 final TaskAttemptContext context) throws IOException, InterruptedException { +130 return createRecordWriter(context); +131 } +132 +133 static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> +134 createRecordWriter(final TaskAttemptContext context) +135 throws IOException { +136 +137 // Get the path of the temporary output file +138 final Path outputPath = FileOutputFormat.getOutputPath(context); +139 final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath(); +140 final Configuration conf = context.getConfiguration(); +141 final FileSystem fs = outputdir.getFileSystem(conf); +142 // These configs. are from hbase-*.xml +143 final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, +144 HConstants.DEFAULT_MAX_FILE_SIZE); +145 // Invented config. Add to hbase-*.xml if other than default compression. +146 final String defaultCompressionStr = conf.get("hfile.compression", +147 Compression.Algorithm.NONE.getName()); +148 final Algorithm defaultCompression = HFileWriterImpl +149 .compressionByName(defaultCompressionStr); +150 final boolean compactionExclude = conf.getBoolean( +151 "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); +152 +153 // create a map from column family to the compression algorithm +154 final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); +155 final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf); +156 final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); +157 +158 String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); +159 final Map<byte[], DataBlockEncoding> datablockEncodingMap +160 = createFamilyDataBlockEncodingMap(conf); +161 final DataBlockEncoding overriddenEncoding; +162 if (dataBlockEncodingStr != null) { +163 overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr); +164 } else { +165 overriddenEncoding = null; +166 } +167 +168 return new RecordWriter<ImmutableBytesWritable, V>() { +169 // Map of families to writers and how much has been output on the writer. +170 private final Map<byte [], WriterLength> writers = +171 new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR); +172 private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY; +173 private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); +174 private boolean rollRequested = false; +175 +176 @Override +177 public void write(ImmutableBytesWritable row, V cell) +178 throws IOException { +179 KeyValue kv = KeyValueUtil.ensureKeyValue(cell); +180 +181 // null input == user explicitly wants to flush +182 if (row == null && kv == null) { +183 rollWriters(); +184 return; +185 } +186 +187 byte [] rowKey = CellUtil.cloneRow(kv); +188 long length = kv.getLength(); +189 byte [] family = CellUtil.cloneFamily(kv); +190 WriterLength wl = this.writers.get(family); +191 +192 // If this is a new column family, verify that the directory exists +193 if (wl == null) { +194 fs.mkdirs(new Path(outputdir, Bytes.toString(family))); +195 } +196 +197 // If any of the HFiles for the column families has reached +198 // maxsize, we need to roll all the writers +199 if (wl != null && wl.written + length >= maxsize) { +200 this.rollRequested = true; +201 } +202 +203 // This can only happen once a row is finished though +204 if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { +205 rollWriters(); +206 } +207 +208 // create a new WAL writer, if necessary +209 if (wl == null || wl.writer == null) { +210 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { +211 HRegionLocation loc = null; +212 String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); +213 if (tableName != null) { +214 try (Connection connection = ConnectionFactory.createConnection(conf); +215 RegionLocator locator = +216 connection.getRegionLocator(TableName.valueOf(tableName))) { +217 loc = locator.getRegionLocation(rowKey); +218 } catch (Throwable e) { +219 LOG.warn("there's something wrong when locating rowkey: " + +220 Bytes.toString(rowKey), e); +221 loc = null; +222 } +223 } +224 +225 if (null == loc) { +226 if (LOG.isTraceEnabled()) { +227 LOG.trace("failed to get region location, so use default writer: " + +228 Bytes.toString(rowKey)); +229 } +230 wl = getNewWriter(family, conf, null); +231 } else { +232 if (LOG.isDebugEnabled()) { +233 LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]"); +234 } +235 InetSocketAddress initialIsa = +236 new InetSocketAddress(loc.getHostname(), loc.getPort()); +237 if (initialIsa.isUnresolved()) { +238 if (LOG.isTraceEnabled()) { +239 LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":" +240 + loc.getPort() + ", so use default writer"); +241 } +242 wl = getNewWriter(family, conf, null); +243 } else { +244 if(LOG.isDebugEnabled()) { +245 LOG.debug("use favored nodes writer: " + initialIsa.getHostString()); +246 } +247 wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa }); +248 } +249 } +250 } else { +251 wl = getNewWriter(family, conf, null); +252 } +253 } +254 +255 // we now have the proper WAL writer. full steam ahead +256 kv.updateLatestStamp(this.now); +257 wl.writer.append(kv); +258 wl.written += length; +259 +260 // Copy the row so we know when a row transition. +261 this.previousRow = rowKey; +262 } +263 +264 private void rollWriters() throws IOException { +265 for (WriterLength wl : this.writers.values()) { +266 if (wl.writer != null) { +267 LOG.info("Writer=" + wl.writer.getPath() + +268 ((wl.written == 0)? "": ", wrote=" + wl.written)); +269 close(wl.writer); +270 } +271 wl.writer = null; +272 wl.written = 0; +273 } +274 this.rollRequested = false; +275 } +276 +277 /* Create a new StoreFile.Writer. +278 * @param family +279 * @return A WriterLength, containing a new StoreFile.Writer. +280 * @throws IOException +281 */ +282 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED", +283 justification="Not important") +284 private WriterLength getNewWriter(byte[] family, Configuration conf, +285 InetSocketAddress[] favoredNodes) throws IOException { +286 WriterLength wl = new WriterLength(); +287 Path familydir = new Path(outputdir, Bytes.toString(family)); +288 Algorithm compression = compressionMap.get(family); +289 compression = compression == null ? defaultCompression : compression; +290 BloomType bloomType = bloomTypeMap.get(family); +291 bloomType = bloomType == null ? BloomType.NONE : bloomType; +292 Integer blockSize = blockSizeMap.get(family); +293 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; +294 DataBlockEncoding encoding = overriddenEncoding; +295 encoding = encoding == null ? datablockEncodingMap.get(family) : encoding; +296 encoding = encoding == null ? DataBlockEncoding.NONE : encoding; +297 Configuration tempConf = new Configuration(conf); +298 tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); +299 HFileContextBuilder contextBuilder = new HFileContextBuilder() +300 .withCompression(compression) +301 .withChecksumType(HStore.getChecksumType(conf)) +302 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) +303 .withBlockSize(blockSize); +304 contextBuilder.withDataBlockEncoding(encoding); +305 HFileContext hFileContext = contextBuilder.build(); +306 +307 if (null == favoredNodes) { +308 wl.writer = +309 new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs) +310 .withOutputDir(familydir).withBloomType(bloomType) +311 .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build(); +312 } else { +313 wl.writer = +314 new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) +315 .withOutputDir(familydir).withBloomType(bloomType) +316 .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) +317 .withFavoredNodes(favoredNodes).build(); +318 } +319 +320 this.writers.put(family, wl); +321 return wl; +322 } +323 +324 private void close(final StoreFileWriter w) throws IOException { +325 if (w != null) { +326 w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, +327 Bytes.toBytes(System.currentTimeMillis())); +328 w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, +329 Bytes.toBytes(context.getTaskAttemptID().toString())); +330 w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, +331 Bytes.toBytes(true)); +332 w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, +333 Bytes.toBytes(compactionExclude)); +334 w.appendTrackedTimestampsToMetadata(); +335 w.close(); +336 } +337 } +338 +339 @Override +340 public void close(TaskAttemptContext c) +341 throws IOException, InterruptedException { +342 for (WriterLength wl: this.writers.values()) { +343 close(wl.writer); +344 } +345 } +346 }; +347 } +348 +349 /* +350 * Data structure to hold a Writer and amount of data written on it. +351 */ +352 static class WriterLength { +353 long written = 0; +354 StoreFileWriter writer = null; +355 } +356 +357 /** +358 * Return the start keys of all of the regions in this table, +359 * as a list of ImmutableBytesWritable. +360 */ +361 private static List<ImmutableBytesWritable> getRegionStartKeys(RegionLocator table) +362 throws IOException { +363 byte[][] byteKeys = table.getStartKeys(); +364 ArrayList<ImmutableBytesWritable> ret = +365 new ArrayList<ImmutableBytesWritable>(byteKeys.length); +366 for (byte[] byteKey : byteKeys) { +367 ret.add(new ImmutableBytesWritable(byteKey)); +368 } +369 return ret; +370 } +371 +372 /** +373 * Write out a {@link SequenceFile} that can be read by +374 * {@link TotalOrderPartitioner} that contains the split points in startKeys. +375 */ +376 @SuppressWarnings("deprecation") +377 private static void writePartitions(Configuration conf, Path partitionsPath, +378 List<ImmutableBytesWritable> startKeys) throws IOException { +379 LOG.info("Writing partition i